aboutsummaryrefslogtreecommitdiffstats
path: root/config
diff options
context:
space:
mode:
authorPrabin Panta <pantaprabin30@gmail.com>2025-10-26 20:07:30 +0545
committerGitHub <noreply@github.com>2025-10-26 20:07:30 +0545
commit957b0db7dd3e9274aa67bb747ad5a00615f6b23a (patch)
tree31f706fc0f0f500ddba748efb9227aa7fb27d9b7 /config
parent53fe242ffdddf74bf13e32999027053ee4614b20 (diff)
parentf876fc42308b949e791e6f685fa9c6f32605667e (diff)
Merge pull request #2 from prabinpanta0/development-weather
Used code rabbit extension and github copilot for review
Diffstat (limited to 'config')
-rwxr-xr-xconfig/hypr/UserScripts/Weather.py688
1 files changed, 510 insertions, 178 deletions
diff --git a/config/hypr/UserScripts/Weather.py b/config/hypr/UserScripts/Weather.py
index ca1d5281..3c5d58f9 100755
--- a/config/hypr/UserScripts/Weather.py
+++ b/config/hypr/UserScripts/Weather.py
@@ -3,15 +3,42 @@
# Rewritten to use Open-Meteo APIs (worldwide, no API key) for robust weather data.
# Outputs Waybar-compatible JSON and a simple text cache.
+from __future__ import annotations
+
import json
import os
import sys
import time
import html
-from typing import Any, Dict, List, Optional, Tuple
-
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union, cast
+from typing import NamedTuple
import requests
+from dataclasses import dataclass
+
+@dataclass
+class Location:
+ lat: float
+ lon: float
+ place: Optional[str] = None
+
+
+@dataclass
+class WeatherData:
+ temp_str: str
+ feels_str: str
+ icon: str
+ status: str
+ min_max: str
+ wind_text: str
+ humidity_text: str
+ visibility_text: str
+ aqi_text: str
+ hourly_precip: str
+ is_day: int
+ code: int
+
# =============== Configuration ===============
# You can configure behavior via environment variables OR the constants below.
# Examples (zsh):
@@ -27,9 +54,9 @@ import requests
# export WEATHER_TOOLTIP_MARKUP=1 # 1 to enable Pango markup, 0 to disable
# export WEATHER_LOC_ICON="📍" # or "*" for ASCII-only
#
-CACHE_DIR = os.path.expanduser("~/.cache")
-API_CACHE_PATH = os.path.join(CACHE_DIR, "open_meteo_cache.json")
-SIMPLE_TEXT_CACHE_PATH = os.path.join(CACHE_DIR, ".weather_cache")
+CACHE_DIR: Path = Path.home() / ".cache"
+API_CACHE_PATH: Path = CACHE_DIR / "open_meteo_cache.json"
+SIMPLE_TEXT_CACHE_PATH: Path = CACHE_DIR / ".weather_cache"
CACHE_TTL_SECONDS = int(os.getenv("WEATHER_CACHE_TTL", "600")) # default 10 minutes
# Units: metric or imperial (default metric)
@@ -138,19 +165,68 @@ def log_debug(msg: str) -> None:
def ensure_cache_dir() -> None:
try:
- os.makedirs(CACHE_DIR, exist_ok=True)
+ # CACHE_DIR is a Path
+ CACHE_DIR.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"Error creating cache dir: {e}", file=sys.stderr)
+def _coerce_numeric(value: Any, to_int: bool) -> Optional[Union[int, float]]:
+ if to_int:
+ if isinstance(value, int):
+ return value
+ if isinstance(value, (float, str)):
+ try:
+ return int(float(value))
+ except (ValueError, TypeError):
+ return None
+ return None
+ else:
+ if isinstance(value, float):
+ return value
+ if isinstance(value, int):
+ return float(value)
+ if isinstance(value, str):
+ try:
+ return float(value)
+ except (ValueError, TypeError):
+ return None
+ return None
+
+
+def coerce_int(value: Any) -> Optional[int]:
+ return cast(Optional[int], _coerce_numeric(value, True))
+
+
+def coerce_float(value: Any) -> Optional[float]:
+ return cast(Optional[float], _coerce_numeric(value, False))
+
+
+def coerce_number(value: Any) -> Union[int, float, None]:
+ if isinstance(value, (int, float)):
+ return value
+ if isinstance(value, str):
+ try:
+ # Parse to float, then return int if it has no fractional part
+ f = float(value)
+ return int(f) if f.is_integer() else f
+ except (ValueError, TypeError):
+ return None
+ return None
+
+
def read_api_cache() -> Optional[Dict[str, Any]]:
try:
- if not os.path.exists(API_CACHE_PATH):
+ if not API_CACHE_PATH.exists():
return None
- with open(API_CACHE_PATH, "r", encoding="utf-8") as f:
+ with API_CACHE_PATH.open("r", encoding="utf-8") as f:
data = json.load(f)
- if (time.time() - data.get("timestamp", 0)) <= CACHE_TTL_SECONDS:
- return data
+ # Use ensure_dict for safety
+ data_dict = ensure_dict(data)
+ timestamp_val = data_dict.get("timestamp", 0)
+ timestamp = coerce_float(timestamp_val) or 0
+ if (time.time() - timestamp) <= CACHE_TTL_SECONDS:
+ return data_dict
return None
except Exception as e:
print(f"Error reading cache: {e}", file=sys.stderr)
@@ -161,7 +237,7 @@ def write_api_cache(payload: Dict[str, Any]) -> None:
try:
ensure_cache_dir()
payload["timestamp"] = time.time()
- with open(API_CACHE_PATH, "w", encoding="utf-8") as f:
+ with API_CACHE_PATH.open("w", encoding="utf-8") as f:
json.dump(payload, f)
except Exception as e:
print(f"Error writing API cache: {e}", file=sys.stderr)
@@ -170,34 +246,42 @@ def write_api_cache(payload: Dict[str, Any]) -> None:
def write_simple_text_cache(text: str) -> None:
try:
ensure_cache_dir()
- with open(SIMPLE_TEXT_CACHE_PATH, "w", encoding="utf-8") as f:
+ with SIMPLE_TEXT_CACHE_PATH.open("w", encoding="utf-8") as f:
f.write(text)
except Exception as e:
print(f"Error writing simple cache: {e}", file=sys.stderr)
-def get_coords() -> Tuple[float, float]:
- # 1) Explicit env
+def get_coords_from_env() -> Optional[Tuple[float, float]]:
if ENV_LAT and ENV_LON:
try:
return float(ENV_LAT), float(ENV_LON)
except ValueError:
print("Invalid WEATHER_LAT/WEATHER_LON; falling back to IP geolocation", file=sys.stderr)
+ return None
+
- # 2) Try cached coordinates from last successful forecast
+def get_coords_from_cache() -> Optional[Tuple[float, float]]:
try:
cached = read_api_cache()
- if cached and isinstance(cached, dict):
- fc = cached.get("forecast") or {}
- lat = fc.get("latitude")
- lon = fc.get("longitude")
- if isinstance(lat, (int, float)) and isinstance(lon, (int, float)):
- return float(lat), float(lon)
+ if cached:
+ fc = ensure_dict(cached.get("forecast"))
+ lat_raw = safe_get(fc, "latitude")
+ lon_raw = safe_get(fc, "longitude")
+ lat = coerce_float(lat_raw)
+ lon = coerce_float(lon_raw)
+ if lat is None:
+ log_debug(f"Unexpected type for cached latitude: {type(lat_raw)}")
+ if lon is None:
+ log_debug(f"Unexpected type for cached longitude: {type(lon_raw)}")
+ if lat is not None and lon is not None:
+ return lat, lon
except Exception as e:
print(f"Reading cached coords failed: {e}", file=sys.stderr)
+ return None
+
- # 3) IP-based geolocation with multiple providers (prefer ipwho.is, ipapi.co; ipinfo.io as fallback)
- # ipwho.is
+def get_coords_from_ipwho() -> Optional[Tuple[float, float]]:
try:
resp = SESSION.get("https://ipwho.is/", timeout=TIMEOUT)
resp.raise_for_status()
@@ -209,8 +293,10 @@ def get_coords() -> Tuple[float, float]:
return float(lat), float(lon)
except Exception as e:
print(f"ipwho.is failed: {e}", file=sys.stderr)
+ return None
- # ipapi.co
+
+def get_coords_from_ipapi() -> Optional[Tuple[float, float]]:
try:
resp = SESSION.get("https://ipapi.co/json", timeout=TIMEOUT)
resp.raise_for_status()
@@ -221,8 +307,10 @@ def get_coords() -> Tuple[float, float]:
return float(lat), float(lon)
except Exception as e:
print(f"ipapi.co failed: {e}", file=sys.stderr)
+ return None
- # ipinfo.io (fallback)
+
+def get_coords_from_ipinfo() -> Optional[Tuple[float, float]]:
try:
resp = SESSION.get("https://ipinfo.io/json", timeout=TIMEOUT)
resp.raise_for_status()
@@ -233,6 +321,24 @@ def get_coords() -> Tuple[float, float]:
return float(lat_s), float(lon_s)
except Exception as e:
print(f"ipinfo.io failed: {e}", file=sys.stderr)
+ return None
+
+
+def get_coords() -> Tuple[float, float]:
+ # 1) Explicit env
+ coords = get_coords_from_env()
+ if coords:
+ return coords
+
+ # 2) Try cached coordinates
+ coords = get_coords_from_cache()
+ if coords:
+ return coords
+
+ # 3) IP-based geolocation
+ coords = get_coords_from_ipwho() or get_coords_from_ipapi() or get_coords_from_ipinfo()
+ if coords:
+ return coords
# 4) Last resort
print("IP geolocation failed: no providers succeeded", file=sys.stderr)
@@ -272,7 +378,7 @@ def format_visibility(meters: Optional[float]) -> str:
def fetch_open_meteo(lat: float, lon: float) -> Dict[str, Any]:
base = "https://api.open-meteo.com/v1/forecast"
- params = {
+ params: Dict[str, Union[str, float]] = {
"latitude": lat,
"longitude": lon,
"current": "temperature_2m,apparent_temperature,relative_humidity_2m,wind_speed_10m,wind_direction_10m,weather_code,visibility,precipitation,pressure_msl,is_day",
@@ -289,7 +395,7 @@ def fetch_open_meteo(lat: float, lon: float) -> Dict[str, Any]:
def fetch_aqi(lat: float, lon: float) -> Optional[Dict[str, Any]]:
try:
base = "https://air-quality-api.open-meteo.com/v1/air-quality"
- params = {
+ params: Dict[str, Union[str, float]] = {
"latitude": lat,
"longitude": lon,
"current": "european_aqi",
@@ -303,37 +409,51 @@ def fetch_aqi(lat: float, lon: float) -> Optional[Dict[str, Any]]:
return None
-def fetch_place(lat: float, lon: float) -> Optional[str]:
- """Reverse geocode lat/lon to an approximate place. Tries Nominatim first, then Open-Meteo."""
- lang = os.getenv("WEATHER_LANG", "en")
+def extract_place_parts_nominatim(data_dict: JSONDict) -> List[str]:
+ address = ensure_dict(data_dict.get("address"))
+ candidates = [data_dict.get("name"), address.get("city"), address.get("town"), address.get("village"), address.get("hamlet")]
+ name = cast(Optional[str], next((c for c in candidates if c is not None), None))
+ admin1 = cast(Optional[str], address.get("state"))
+ country = cast(Optional[str], address.get("country"))
+ parts: List[str] = []
+ if name is not None:
+ parts.append(name)
+ if admin1 is not None:
+ parts.append(admin1)
+ if country is not None:
+ parts.append(country)
+ return parts
- # 1) Nominatim (OpenStreetMap)
+
+def extract_place_parts_open_meteo(p: JSONDict) -> List[str]:
+ name = cast(Optional[str], p.get("name"))
+ admin1 = cast(Optional[str], p.get("admin1"))
+ country = cast(Optional[str], p.get("country"))
+ parts: List[str] = []
+ for part in [name, admin1, country]:
+ if part is not None:
+ parts.append(part)
+ return parts
+
+
+def reverse_geocode(base: str, params: Dict[str, Union[str, float]], headers: Optional[Dict[str, str]] = None) -> Optional[str]:
try:
- base = "https://nominatim.openstreetmap.org/reverse"
- params = {
- "lat": lat,
- "lon": lon,
- "format": "jsonv2",
- "accept-language": lang,
- }
- headers = {"User-Agent": UA + " Weather.py/1.0"}
resp = SESSION.get(base, params=params, headers=headers, timeout=TIMEOUT)
resp.raise_for_status()
data = resp.json()
- address = data.get("address", {})
- name = data.get("name") or address.get("city") or address.get("town") or address.get("village") or address.get("hamlet")
- admin1 = address.get("state")
- country = address.get("country")
- parts = [part for part in [name, admin1, country] if part]
+ data_dict = ensure_dict(data)
+ parts = extract_place_parts_nominatim(data_dict)
if parts:
return ", ".join(parts)
except Exception as e:
- log_debug(f"Reverse geocoding (Nominatim) failed: {e}")
+ log_debug(f"Reverse geocoding failed: {e}")
+ return None
- # 2) Open-Meteo reverse (fallback)
+
+def reverse_geocode_open_meteo(lat: float, lon: float, lang: str) -> Optional[str]:
try:
base = "https://geocoding-api.open-meteo.com/v1/reverse"
- params = {
+ params: Dict[str, Union[str, float]] = {
"latitude": lat,
"longitude": lon,
"language": lang,
@@ -342,48 +462,117 @@ def fetch_place(lat: float, lon: float) -> Optional[str]:
resp = SESSION.get(base, params=params, timeout=TIMEOUT)
resp.raise_for_status()
data = resp.json()
- results = data.get("results") or []
+ data_dict = ensure_dict(data)
+ results = ensure_list(data_dict.get("results"))
if results:
- p = results[0]
- name = p.get("name")
- admin1 = p.get("admin1")
- country = p.get("country")
- parts = [part for part in [name, admin1, country] if part]
+ p = ensure_dict(results[0])
+ parts = extract_place_parts_open_meteo(p)
if parts:
return ", ".join(parts)
except Exception as e:
log_debug(f"Reverse geocoding (Open-Meteo) failed: {e}")
-
return None
+def fetch_place(lat: float, lon: float) -> Optional[str]:
+ """Reverse geocode lat/lon to an approximate place. Tries Nominatim first, then Open-Meteo."""
+ lang = os.getenv("WEATHER_LANG", "en")
+
+ # 1) Nominatim (OpenStreetMap)
+ base = "https://nominatim.openstreetmap.org/reverse"
+ params: Dict[str, Union[str, float]] = {
+ "lat": lat,
+ "lon": lon,
+ "format": "jsonv2",
+ "accept-language": lang,
+ }
+ headers = {"User-Agent": UA + " Weather.py/1.0"}
+ place = reverse_geocode(base, params, headers)
+ if place:
+ return place
+
+ # 2) Open-Meteo reverse (fallback)
+ return reverse_geocode_open_meteo(lat, lon, lang)
+
+
# =============== Build Output ===============
-def safe_get(dct: Dict[str, Any], *keys, default=None):
- cur: Any = dct
- for k in keys:
+_T = TypeVar("_T")
+
+JSONValue = Union[str, int, float, bool, None, "JSONDict", "JSONList"]
+JSONDict = Dict[str, JSONValue]
+JSONList = List[JSONValue]
+
+
+def ensure_dict(value: Any) -> JSONDict:
+ """Return a JSON-like dict when the incoming value looks like one."""
+ if isinstance(value, dict):
+ return cast(JSONDict, value)
+ # Warn about unexpected type to catch API shape mismatches
+ val_repr = repr(value) if value is not None else "None"
+ if len(val_repr) > 100:
+ val_repr = val_repr[:100] + "..."
+ print(f"Warning: ensure_dict received {type(value).__name__} instead of dict: {val_repr}", file=sys.stderr)
+ return cast(JSONDict, {})
+
+
+def ensure_list(value: Any) -> JSONList:
+ """Return a JSON-like list when the incoming value looks like one."""
+ if isinstance(value, list):
+ return cast(JSONList, value)
+ # Warn about unexpected type to catch API shape mismatches
+ val_repr = repr(value) if value is not None else "None"
+ if len(val_repr) > 100:
+ val_repr = val_repr[:100] + "..."
+ print(f"Warning: ensure_list received {type(value).__name__} instead of list: {val_repr}", file=sys.stderr)
+ return cast(JSONList, [])
+
+
+def safe_get(
+ obj: JSONValue | None,
+ *keys: Union[str, int],
+ default: _T | None = None,
+) -> _T | JSONValue | None:
+ """Safely traverse nested dict/list structures.
+
+ Keys may be strings (for mapping lookups) or ints (for list indices).
+ Returns ``default`` if any lookup fails.
+ """
+
+ cur: JSONValue | None = obj
+ for key in keys:
if isinstance(cur, dict):
- if k not in cur:
+ if not isinstance(key, str) or key not in cur:
return default
- cur = cur[k]
+ cur = cur[key]
elif isinstance(cur, list):
- try:
- cur = cur[k] # type: ignore[index]
- except Exception:
+ if not isinstance(key, int) or key < 0 or key >= len(cur):
return default
+ cur = cur[key]
else:
return default
- return cur
+ return cast(_T | JSONValue | None, cur)
-def build_hourly_precip(forecast: Dict[str, Any]) -> str:
+def get_precipitation_probabilities(forecast: JSONDict) -> List[Optional[float]]:
+ probs_raw = safe_get(forecast, "hourly", "precipitation_probability")
+ probs_raw_list = ensure_list(probs_raw)
+ return [coerce_float(p) if p is not None else None for p in probs_raw_list]
+
+
+def find_current_index(times: List[str], cur_time: Optional[str]) -> int:
+ if cur_time is not None and cur_time in times:
+ return times.index(cur_time)
+ return 0
+
+
+def build_hourly_precip(forecast: JSONDict) -> str:
try:
- times: List[str] = safe_get(forecast, "hourly", "time", default=[]) or []
- probs: List[Optional[float]] = safe_get(
- forecast, "hourly", "precipitation_probability", default=[]
- ) or []
- cur_time: Optional[str] = safe_get(forecast, "current", "time")
- idx = times.index(cur_time) if cur_time in times else 0
+ times_raw = safe_get(forecast, "hourly", "time")
+ times: List[str] = cast(List[str], ensure_list(times_raw))
+ probs = get_precipitation_probabilities(forecast)
+ cur_time: Optional[str] = cast(Optional[str], safe_get(forecast, "current", "time"))
+ idx = find_current_index(times, cur_time)
window = probs[idx : idx + 6]
if not window:
return ""
@@ -393,152 +582,295 @@ def build_hourly_precip(forecast: Dict[str, Any]) -> str:
return ""
-def build_output(lat: float, lon: float, forecast: Dict[str, Any], aqi: Optional[Dict[str, Any]], place: Optional[str] = None) -> Tuple[Dict[str, Any], str]:
- cur = forecast.get("current", {})
- cur_units = forecast.get("current_units", {})
- daily = forecast.get("daily", {})
- daily_units = forecast.get("daily_units", {})
-
- temp_val = cur.get("temperature_2m")
- temp_unit = cur_units.get("temperature_2m", "")
- temp_str = f"{int(round(temp_val))}{temp_unit}" if isinstance(temp_val, (int, float)) else "N/A"
+def build_weather_strings(cur: JSONDict, cur_units: JSONDict, daily: JSONDict, daily_units: JSONDict, temp_unit: str) -> Tuple[str, str, int, int, str, str, str]:
+ temp_val = coerce_float(cur.get("temperature_2m"))
+ temp_unit_str = cast(str, cur_units.get("temperature_2m", ""))
+ temp_str = f"{int(round(temp_val))}{temp_unit_str}" if temp_val is not None else "N/A"
- feels_val = cur.get("apparent_temperature")
- feels_unit = cur_units.get("apparent_temperature", "")
- feels_str = f"Feels like {int(round(feels_val))}{feels_unit}" if isinstance(feels_val, (int, float)) else ""
+ feels_val = coerce_float(cur.get("apparent_temperature"))
+ feels_unit = cast(str, cur_units.get("apparent_temperature", ""))
+ feels_str = f"Feels like {int(round(feels_val))}{feels_unit}" if feels_val is not None else ""
- is_day = int(cur.get("is_day", 1) or 1)
- code = int(cur.get("weather_code", -1) or -1)
+ is_day_val = cur.get("is_day")
+ is_day = coerce_int(is_day_val) or 1
+ weather_code_val = cur.get("weather_code")
+ code = coerce_int(weather_code_val) or -1
icon = wmo_to_icon(code, is_day)
status = wmo_to_status(code)
- # min/max today (index 0)
- tmin_val = safe_get(daily, "temperature_2m_min", 0)
- tmax_val = safe_get(daily, "temperature_2m_max", 0)
- dtemp_unit = daily_units.get("temperature_2m_min", temp_unit)
- tmin_str = f"{int(round(tmin_val))}{dtemp_unit}" if isinstance(tmin_val, (int, float)) else ""
- tmax_str = f"{int(round(tmax_val))}{dtemp_unit}" if isinstance(tmax_val, (int, float)) else ""
+ tmin_val = coerce_float(safe_get(daily, "temperature_2m_min", 0))
+ tmax_val = coerce_float(safe_get(daily, "temperature_2m_max", 0))
+ dtemp_unit = cast(str, daily_units.get("temperature_2m_min", temp_unit))
+ tmin_str = f"{int(round(tmin_val))}{dtemp_unit}" if tmin_val is not None else ""
+ tmax_str = f"{int(round(tmax_val))}{dtemp_unit}" if tmax_val is not None else ""
min_max = f"ī‹‹ {tmin_str}\t\t {tmax_str}" if tmin_str and tmax_str else ""
- wind_val = cur.get("wind_speed_10m")
- wind_unit = cur_units.get("wind_speed_10m", "")
- wind_text = f" {int(round(wind_val))}{wind_unit}" if isinstance(wind_val, (int, float)) else ""
+ return temp_str, feels_str, is_day, code, icon, status, min_max
- hum_val = cur.get("relative_humidity_2m")
- humidity_text = f"îŗ {int(hum_val)}%" if isinstance(hum_val, (int, float)) else ""
- vis_val = cur.get("visibility")
- visibility_text = f"īŽ {format_visibility(vis_val)}" if isinstance(vis_val, (int, float)) else ""
+def build_weather_details(cur: JSONDict, cur_units: JSONDict) -> Tuple[str, str, str]:
+ wind_val_raw = cur.get("wind_speed_10m")
+ wind_val = coerce_float(wind_val_raw)
+ wind_unit = cast(str, cur_units.get("wind_speed_10m", ""))
+ if wind_val is None:
+ log_debug(f"Unexpected type for wind_speed_10m: {type(wind_val_raw)}")
+ wind_text = f" {int(round(wind_val))}{wind_unit}" if wind_val is not None else ""
- aqi_val = safe_get(aqi or {}, "current", "european_aqi")
- aqi_text = f"AQI {int(aqi_val)}" if isinstance(aqi_val, (int, float)) else "AQI N/A"
+ hum_val_raw = cur.get("relative_humidity_2m")
+ hum_val = coerce_float(hum_val_raw)
+ if hum_val is None:
+ log_debug(f"Unexpected type for relative_humidity_2m: {type(hum_val_raw)}")
+ humidity_text = f" {int(hum_val)}%" if hum_val is not None else ""
- hourly_precip = build_hourly_precip(forecast)
- prediction = f"\n\n{hourly_precip}" if hourly_precip else ""
+ vis_val_raw = cur.get("visibility")
+ vis_val = coerce_float(vis_val_raw)
+ if vis_val is None:
+ log_debug(f"Unexpected type for visibility: {type(vis_val_raw)}")
+ visibility_text = f" {format_visibility(vis_val)}" if vis_val is not None else ""
+
+ return wind_text, humidity_text, visibility_text
+
+
+def build_aqi_info(aqi: Optional[Dict[str, Any]]) -> str:
+ aqi_dict = ensure_dict(aqi)
+ aqi_val_raw = safe_get(aqi_dict, "current", "european_aqi")
+ aqi_val = coerce_float(aqi_val_raw)
+ if aqi_val is None:
+ log_debug(f"Unexpected type for european_aqi: {type(aqi_val_raw)}")
+ return f"AQI {int(aqi_val)}" if aqi_val is not None else "AQI N/A"
+
+
+def build_place_str(lat: float, lon: float, place: Optional[str]) -> str:
+ return MANUAL_PLACE or ENV_PLACE or place or f"{lat:.3f}, {lon:.3f}"
+
+
+
+
+class TooltipParams(NamedTuple):
+ temp_str: str
+ icon: str
+ status: str
+ location_text: str
+ feels_str: str
+ min_max: str
+ wind_text: str
+ humidity_text: str
+ visibility_text: str
+ aqi_text: str
+ hourly_precip: str
+
+
+def build_tooltip_markup(params: TooltipParams) -> str:
+ return str.format(
+ "\t\t{}\t\t\n{}\n{}\n{}\n{}\n\n{}\n{}\n{}{}",
+ f'<span size="xx-large">{esc(params.temp_str)}</span>',
+ f"<big> {params.icon}</big>",
+ f"<b>{esc(params.status)}</b>",
+ esc(params.location_text),
+ f"<small>{esc(params.feels_str)}</small>" if params.feels_str else "",
+ f"<b>{esc(params.min_max)}</b>" if params.min_max else "",
+ f"{esc(params.wind_text)}\t{esc(params.humidity_text)}",
+ f"{esc(params.visibility_text)}\t{esc(params.aqi_text)}",
+ f"<i> {esc(params.hourly_precip)}</i>" if params.hourly_precip else "",
+ )
- # Build place string (priority: MANUAL_PLACE > ENV_PLACE > reverse geocode > lat,lon)
- place_str = (MANUAL_PLACE or ENV_PLACE or place or f"{lat:.3f}, {lon:.3f}")
- location_text = f"{LOC_ICON} {place_str}"
- # Build tooltip (markup or plain)
+def build_tooltip_plain(params: TooltipParams) -> str:
+ lines = [
+ f"{params.icon} {params.temp_str}",
+ params.status,
+ params.location_text,
+ ]
+ if params.feels_str:
+ lines.append(params.feels_str)
+ if params.min_max:
+ lines.append(params.min_max)
+ combined_wind = f"{params.wind_text} {params.humidity_text}".strip()
+ if combined_wind:
+ lines.append(combined_wind)
+ combined_visibility = f"{params.visibility_text} {params.aqi_text}".strip()
+ if combined_visibility:
+ lines.append(combined_visibility)
+ if params.hourly_precip:
+ lines.append(params.hourly_precip)
+ return "\n".join([ln for ln in lines if ln])
+
+
+def build_tooltip_text(params: TooltipParams) -> str:
if TOOLTIP_MARKUP:
- # Escape dynamic text to avoid breaking Pango markup
- tooltip_text = str.format(
- "\t\t{}\t\t\n{}\n{}\n{}\n{}\n\n{}\n{}\n{}{}",
- f'<span size="xx-large">{esc(temp_str)}</span>',
- f"<big> {icon}</big>",
- f"<b>{esc(status)}</b>",
- esc(location_text),
- f"<small>{esc(feels_str)}</small>" if feels_str else "",
- f"<b>{esc(min_max)}</b>" if min_max else "",
- f"{esc(wind_text)}\t{esc(humidity_text)}",
- f"{esc(visibility_text)}\t{esc(aqi_text)}",
- f"<i> {esc(prediction)}</i>" if prediction else "",
- )
+ return build_tooltip_markup(params)
else:
- lines = [
- f"{icon} {temp_str}",
- status,
- location_text,
- ]
- if feels_str:
- lines.append(feels_str)
- if min_max:
- lines.append(min_max)
- lines.append(f"{wind_text} {humidity_text}".strip())
- lines.append(f"{visibility_text} {aqi_text}".strip())
- if prediction:
- lines.append(hourly_precip)
- tooltip_text = "\n".join([ln for ln in lines if ln])
+ return build_tooltip_plain(params)
+
+
+def gather_weather_data(forecast: Optional[Dict[str, Any]], aqi: Optional[Dict[str, Any]]) -> WeatherData:
+ forecast_dict = ensure_dict(forecast)
+ cur = ensure_dict(forecast_dict.get("current"))
+ cur_units = ensure_dict(forecast_dict.get("current_units"))
+ daily = ensure_dict(forecast_dict.get("daily"))
+ daily_units = ensure_dict(forecast_dict.get("daily_units"))
+
+ temp_str, feels_str, is_day, code, icon, status, min_max = build_weather_strings(cur, cur_units, daily, daily_units, cast(str, cur_units.get("temperature_2m", "")))
+ wind_text, humidity_text, visibility_text = build_weather_details(cur, cur_units)
+ aqi_text = build_aqi_info(aqi)
+ hourly_precip = build_hourly_precip(forecast_dict)
+
+ return WeatherData(
+ temp_str=temp_str,
+ feels_str=feels_str,
+ icon=icon,
+ status=status,
+ min_max=min_max,
+ wind_text=wind_text,
+ humidity_text=humidity_text,
+ visibility_text=visibility_text,
+ aqi_text=aqi_text,
+ hourly_precip=hourly_precip,
+ is_day=is_day,
+ code=code,
+ )
+
- out_data = {
- "text": f"{icon} {temp_str}",
- "alt": status,
+def build_output(loc: Location, forecast: Optional[Dict[str, Any]], aqi: Optional[Dict[str, Any]]) -> Tuple[Dict[str, str], str]:
+ data = gather_weather_data(forecast, aqi)
+
+ place_str = build_place_str(loc.lat, loc.lon, loc.place)
+ location_text = f"{LOC_ICON} {place_str}"
+
+ tooltip_text = build_tooltip_text(
+ TooltipParams(
+ data.temp_str, data.icon, data.status, location_text, data.feels_str, data.min_max,
+ data.wind_text, data.humidity_text, data.visibility_text, data.aqi_text, data.hourly_precip
+ )
+ )
+
+ out_data: Dict[str, Any] = {
+ "text": f"{data.icon} {data.temp_str}",
+ "alt": data.status,
"tooltip": tooltip_text,
- "class": f"wmo-{code} {'day' if is_day else 'night'}",
+ "class": f"wmo-{data.code} {'day' if data.is_day else 'night'}",
}
simple_weather = (
- f"{icon} {status}\n"
- + f" {temp_str} ({feels_str})\n"
- + (f"{wind_text} \n" if wind_text else "")
- + (f"{humidity_text} \n" if humidity_text else "")
- + f"{visibility_text} {aqi_text}\n"
+ f"{data.icon} {data.status}\n"
+ + f" {data.temp_str} ({data.feels_str})\n"
+ + (f"{data.wind_text} \n" if data.wind_text else "")
+ + (f"{data.humidity_text} \n" if data.humidity_text else "")
+ + f"{data.visibility_text} {data.aqi_text}\n"
)
return out_data, simple_weather
-def main() -> None:
- lat, lon = get_coords()
-
- # Try cache first
+def try_cached_weather(lat: float, lon: float) -> Optional[Tuple[Dict[str, str], str]]:
cached = read_api_cache()
- if cached and isinstance(cached, dict):
- forecast = cached.get("forecast")
- aqi = cached.get("aqi")
- cached_place = cached.get("place") if isinstance(cached.get("place"), str) else None
+ if cached:
+ forecast = cast(Optional[Dict[str, Any]], cached.get("forecast"))
+ aqi = cast(Optional[Dict[str, Any]], cached.get("aqi"))
+ place_val = cached.get("place")
+ cached_place = place_val if isinstance(place_val, str) else None
place_effective = MANUAL_PLACE or ENV_PLACE or cached_place
try:
- out, simple = build_output(lat, lon, forecast, aqi, place_effective)
- print(json.dumps(out, ensure_ascii=False))
- write_simple_text_cache(simple)
- return
+ return build_output(Location(lat, lon, place_effective), forecast, aqi)
except Exception as e:
print(f"Cached data build failed, refetching: {e}", file=sys.stderr)
+ return None
- # Fetch fresh
+
+def fetch_fresh_weather(lat: float, lon: float) -> Optional[Tuple[Dict[str, str], str]]:
try:
forecast = fetch_open_meteo(lat, lon)
aqi = fetch_aqi(lat, lon)
- # Use manual/env place if provided; otherwise reverse geocode
place_effective = MANUAL_PLACE or ENV_PLACE or fetch_place(lat, lon)
write_api_cache({"forecast": forecast, "aqi": aqi, "place": place_effective})
- out, simple = build_output(lat, lon, forecast, aqi, place_effective)
- print(json.dumps(out, ensure_ascii=False))
- write_simple_text_cache(simple)
+ return build_output(Location(lat, lon, place_effective), forecast, aqi)
except Exception as e:
print(f"Open-Meteo fetch failed: {e}", file=sys.stderr)
- # Last resort: try stale cache without TTL
- try:
- if os.path.exists(API_CACHE_PATH):
- with open(API_CACHE_PATH, "r", encoding="utf-8") as f:
- stale = json.load(f)
- out, simple = build_output(lat, lon, stale.get("forecast", {}), stale.get("aqi"), stale.get("place") if isinstance(stale.get("place"), str) else None)
- print(json.dumps(out, ensure_ascii=False))
- write_simple_text_cache(simple)
- return
- except Exception as e2:
- print(f"Failed to use stale cache: {e2}", file=sys.stderr)
- # Fallback minimal output
- fallback = {
- "text": f"{WEATHER_ICONS['default']} N/A",
- "alt": "Unavailable",
- "tooltip": "Weather unavailable",
- "class": "unavailable",
- }
- print(json.dumps(fallback, ensure_ascii=False))
+ return None
+
+
+def try_stale_weather(lat: float, lon: float) -> Optional[Tuple[Dict[str, str], str]]:
+ try:
+ if API_CACHE_PATH.exists():
+ with API_CACHE_PATH.open("r", encoding="utf-8") as f:
+ stale = json.load(f)
+ stale_dict = ensure_dict(stale)
+ place_val = stale_dict.get("place")
+ place = place_val if isinstance(place_val, str) else None
+ forecast = cast(Optional[Dict[str, Any]], stale_dict.get("forecast"))
+ aqi = cast(Optional[Dict[str, Any]], stale_dict.get("aqi"))
+ return build_output(Location(lat, lon, place), forecast, aqi)
+ except Exception as e2:
+ print(f"Failed to use stale cache: {e2}", file=sys.stderr)
+ return None
+
+
+def main() -> None:
+ lat, lon = get_coords()
+
+ # Try cache first
+ result = try_cached_weather(lat, lon)
+ if result:
+ out, simple = result
+ print(json.dumps(out, ensure_ascii=False))
+ write_simple_text_cache(simple)
+ return
+
+ # Fetch fresh
+ result = fetch_fresh_weather(lat, lon)
+ if result:
+ out, simple = result
+ print(json.dumps(out, ensure_ascii=False))
+ write_simple_text_cache(simple)
+ return
+
+ # Last resort: try stale cache
+ result = try_stale_weather(lat, lon)
+ if result:
+ out, simple = result
+ print(json.dumps(out, ensure_ascii=False))
+ write_simple_text_cache(simple)
+ return
+
+ # Fallback minimal output
+ fallback = {
+ "text": f"{WEATHER_ICONS['default']} N/A",
+ "alt": "Unavailable",
+ "tooltip": "Weather unavailable",
+ "class": "unavailable",
+ }
+ print(json.dumps(fallback, ensure_ascii=False))
+
+
+def test_coerce_functions():
+ """Manual testing for coerce functions."""
+ # Test coerce_int
+ assert coerce_int(5) == 5
+ assert coerce_int(5.5) == 5
+ assert coerce_int("5") == 5
+ assert coerce_int("5.7") == 5
+ assert coerce_int("abc") is None
+ assert coerce_int(None) is None
+
+ # Test coerce_float
+ assert coerce_float(5.5) == 5.5
+ assert coerce_float(5) == 5.0
+ assert coerce_float("5.5") == 5.5
+ assert coerce_float("abc") is None
+ assert coerce_float(None) is None
+
+ # Test coerce_number
+ assert coerce_number(5) == 5
+ assert coerce_number(5.5) == 5.5
+ assert coerce_number("5") == 5
+ assert coerce_number("5.5") == 5.5
+ assert coerce_number("abc") is None
+
+ print("All coerce function tests passed.", file=sys.stderr)
if __name__ == "__main__":
- main()
+ if len(sys.argv) > 1 and sys.argv[1] == "--test":
+ test_coerce_functions()
+ else:
+ main()
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage