diff options
| author | Prabin Panta <pantaprabin30@gmail.com> | 2025-10-26 20:07:30 +0545 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-10-26 20:07:30 +0545 |
| commit | 957b0db7dd3e9274aa67bb747ad5a00615f6b23a (patch) | |
| tree | 31f706fc0f0f500ddba748efb9227aa7fb27d9b7 /config | |
| parent | 53fe242ffdddf74bf13e32999027053ee4614b20 (diff) | |
| parent | f876fc42308b949e791e6f685fa9c6f32605667e (diff) | |
Merge pull request #2 from prabinpanta0/development-weather
Used code rabbit extension and github copilot for review
Diffstat (limited to 'config')
| -rwxr-xr-x | config/hypr/UserScripts/Weather.py | 688 |
1 files changed, 510 insertions, 178 deletions
diff --git a/config/hypr/UserScripts/Weather.py b/config/hypr/UserScripts/Weather.py index ca1d5281..3c5d58f9 100755 --- a/config/hypr/UserScripts/Weather.py +++ b/config/hypr/UserScripts/Weather.py @@ -3,15 +3,42 @@ # Rewritten to use Open-Meteo APIs (worldwide, no API key) for robust weather data. # Outputs Waybar-compatible JSON and a simple text cache. +from __future__ import annotations + import json import os import sys import time import html -from typing import Any, Dict, List, Optional, Tuple - +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union, cast +from typing import NamedTuple import requests +from dataclasses import dataclass + +@dataclass +class Location: + lat: float + lon: float + place: Optional[str] = None + + +@dataclass +class WeatherData: + temp_str: str + feels_str: str + icon: str + status: str + min_max: str + wind_text: str + humidity_text: str + visibility_text: str + aqi_text: str + hourly_precip: str + is_day: int + code: int + # =============== Configuration =============== # You can configure behavior via environment variables OR the constants below. # Examples (zsh): @@ -27,9 +54,9 @@ import requests # export WEATHER_TOOLTIP_MARKUP=1 # 1 to enable Pango markup, 0 to disable # export WEATHER_LOC_ICON="đ" # or "*" for ASCII-only # -CACHE_DIR = os.path.expanduser("~/.cache") -API_CACHE_PATH = os.path.join(CACHE_DIR, "open_meteo_cache.json") -SIMPLE_TEXT_CACHE_PATH = os.path.join(CACHE_DIR, ".weather_cache") +CACHE_DIR: Path = Path.home() / ".cache" +API_CACHE_PATH: Path = CACHE_DIR / "open_meteo_cache.json" +SIMPLE_TEXT_CACHE_PATH: Path = CACHE_DIR / ".weather_cache" CACHE_TTL_SECONDS = int(os.getenv("WEATHER_CACHE_TTL", "600")) # default 10 minutes # Units: metric or imperial (default metric) @@ -138,19 +165,68 @@ def log_debug(msg: str) -> None: def ensure_cache_dir() -> None: try: - os.makedirs(CACHE_DIR, exist_ok=True) + # CACHE_DIR is a Path + CACHE_DIR.mkdir(parents=True, exist_ok=True) except Exception as e: print(f"Error creating cache dir: {e}", file=sys.stderr) +def _coerce_numeric(value: Any, to_int: bool) -> Optional[Union[int, float]]: + if to_int: + if isinstance(value, int): + return value + if isinstance(value, (float, str)): + try: + return int(float(value)) + except (ValueError, TypeError): + return None + return None + else: + if isinstance(value, float): + return value + if isinstance(value, int): + return float(value) + if isinstance(value, str): + try: + return float(value) + except (ValueError, TypeError): + return None + return None + + +def coerce_int(value: Any) -> Optional[int]: + return cast(Optional[int], _coerce_numeric(value, True)) + + +def coerce_float(value: Any) -> Optional[float]: + return cast(Optional[float], _coerce_numeric(value, False)) + + +def coerce_number(value: Any) -> Union[int, float, None]: + if isinstance(value, (int, float)): + return value + if isinstance(value, str): + try: + # Parse to float, then return int if it has no fractional part + f = float(value) + return int(f) if f.is_integer() else f + except (ValueError, TypeError): + return None + return None + + def read_api_cache() -> Optional[Dict[str, Any]]: try: - if not os.path.exists(API_CACHE_PATH): + if not API_CACHE_PATH.exists(): return None - with open(API_CACHE_PATH, "r", encoding="utf-8") as f: + with API_CACHE_PATH.open("r", encoding="utf-8") as f: data = json.load(f) - if (time.time() - data.get("timestamp", 0)) <= CACHE_TTL_SECONDS: - return data + # Use ensure_dict for safety + data_dict = ensure_dict(data) + timestamp_val = data_dict.get("timestamp", 0) + timestamp = coerce_float(timestamp_val) or 0 + if (time.time() - timestamp) <= CACHE_TTL_SECONDS: + return data_dict return None except Exception as e: print(f"Error reading cache: {e}", file=sys.stderr) @@ -161,7 +237,7 @@ def write_api_cache(payload: Dict[str, Any]) -> None: try: ensure_cache_dir() payload["timestamp"] = time.time() - with open(API_CACHE_PATH, "w", encoding="utf-8") as f: + with API_CACHE_PATH.open("w", encoding="utf-8") as f: json.dump(payload, f) except Exception as e: print(f"Error writing API cache: {e}", file=sys.stderr) @@ -170,34 +246,42 @@ def write_api_cache(payload: Dict[str, Any]) -> None: def write_simple_text_cache(text: str) -> None: try: ensure_cache_dir() - with open(SIMPLE_TEXT_CACHE_PATH, "w", encoding="utf-8") as f: + with SIMPLE_TEXT_CACHE_PATH.open("w", encoding="utf-8") as f: f.write(text) except Exception as e: print(f"Error writing simple cache: {e}", file=sys.stderr) -def get_coords() -> Tuple[float, float]: - # 1) Explicit env +def get_coords_from_env() -> Optional[Tuple[float, float]]: if ENV_LAT and ENV_LON: try: return float(ENV_LAT), float(ENV_LON) except ValueError: print("Invalid WEATHER_LAT/WEATHER_LON; falling back to IP geolocation", file=sys.stderr) + return None + - # 2) Try cached coordinates from last successful forecast +def get_coords_from_cache() -> Optional[Tuple[float, float]]: try: cached = read_api_cache() - if cached and isinstance(cached, dict): - fc = cached.get("forecast") or {} - lat = fc.get("latitude") - lon = fc.get("longitude") - if isinstance(lat, (int, float)) and isinstance(lon, (int, float)): - return float(lat), float(lon) + if cached: + fc = ensure_dict(cached.get("forecast")) + lat_raw = safe_get(fc, "latitude") + lon_raw = safe_get(fc, "longitude") + lat = coerce_float(lat_raw) + lon = coerce_float(lon_raw) + if lat is None: + log_debug(f"Unexpected type for cached latitude: {type(lat_raw)}") + if lon is None: + log_debug(f"Unexpected type for cached longitude: {type(lon_raw)}") + if lat is not None and lon is not None: + return lat, lon except Exception as e: print(f"Reading cached coords failed: {e}", file=sys.stderr) + return None + - # 3) IP-based geolocation with multiple providers (prefer ipwho.is, ipapi.co; ipinfo.io as fallback) - # ipwho.is +def get_coords_from_ipwho() -> Optional[Tuple[float, float]]: try: resp = SESSION.get("https://ipwho.is/", timeout=TIMEOUT) resp.raise_for_status() @@ -209,8 +293,10 @@ def get_coords() -> Tuple[float, float]: return float(lat), float(lon) except Exception as e: print(f"ipwho.is failed: {e}", file=sys.stderr) + return None - # ipapi.co + +def get_coords_from_ipapi() -> Optional[Tuple[float, float]]: try: resp = SESSION.get("https://ipapi.co/json", timeout=TIMEOUT) resp.raise_for_status() @@ -221,8 +307,10 @@ def get_coords() -> Tuple[float, float]: return float(lat), float(lon) except Exception as e: print(f"ipapi.co failed: {e}", file=sys.stderr) + return None - # ipinfo.io (fallback) + +def get_coords_from_ipinfo() -> Optional[Tuple[float, float]]: try: resp = SESSION.get("https://ipinfo.io/json", timeout=TIMEOUT) resp.raise_for_status() @@ -233,6 +321,24 @@ def get_coords() -> Tuple[float, float]: return float(lat_s), float(lon_s) except Exception as e: print(f"ipinfo.io failed: {e}", file=sys.stderr) + return None + + +def get_coords() -> Tuple[float, float]: + # 1) Explicit env + coords = get_coords_from_env() + if coords: + return coords + + # 2) Try cached coordinates + coords = get_coords_from_cache() + if coords: + return coords + + # 3) IP-based geolocation + coords = get_coords_from_ipwho() or get_coords_from_ipapi() or get_coords_from_ipinfo() + if coords: + return coords # 4) Last resort print("IP geolocation failed: no providers succeeded", file=sys.stderr) @@ -272,7 +378,7 @@ def format_visibility(meters: Optional[float]) -> str: def fetch_open_meteo(lat: float, lon: float) -> Dict[str, Any]: base = "https://api.open-meteo.com/v1/forecast" - params = { + params: Dict[str, Union[str, float]] = { "latitude": lat, "longitude": lon, "current": "temperature_2m,apparent_temperature,relative_humidity_2m,wind_speed_10m,wind_direction_10m,weather_code,visibility,precipitation,pressure_msl,is_day", @@ -289,7 +395,7 @@ def fetch_open_meteo(lat: float, lon: float) -> Dict[str, Any]: def fetch_aqi(lat: float, lon: float) -> Optional[Dict[str, Any]]: try: base = "https://air-quality-api.open-meteo.com/v1/air-quality" - params = { + params: Dict[str, Union[str, float]] = { "latitude": lat, "longitude": lon, "current": "european_aqi", @@ -303,37 +409,51 @@ def fetch_aqi(lat: float, lon: float) -> Optional[Dict[str, Any]]: return None -def fetch_place(lat: float, lon: float) -> Optional[str]: - """Reverse geocode lat/lon to an approximate place. Tries Nominatim first, then Open-Meteo.""" - lang = os.getenv("WEATHER_LANG", "en") +def extract_place_parts_nominatim(data_dict: JSONDict) -> List[str]: + address = ensure_dict(data_dict.get("address")) + candidates = [data_dict.get("name"), address.get("city"), address.get("town"), address.get("village"), address.get("hamlet")] + name = cast(Optional[str], next((c for c in candidates if c is not None), None)) + admin1 = cast(Optional[str], address.get("state")) + country = cast(Optional[str], address.get("country")) + parts: List[str] = [] + if name is not None: + parts.append(name) + if admin1 is not None: + parts.append(admin1) + if country is not None: + parts.append(country) + return parts - # 1) Nominatim (OpenStreetMap) + +def extract_place_parts_open_meteo(p: JSONDict) -> List[str]: + name = cast(Optional[str], p.get("name")) + admin1 = cast(Optional[str], p.get("admin1")) + country = cast(Optional[str], p.get("country")) + parts: List[str] = [] + for part in [name, admin1, country]: + if part is not None: + parts.append(part) + return parts + + +def reverse_geocode(base: str, params: Dict[str, Union[str, float]], headers: Optional[Dict[str, str]] = None) -> Optional[str]: try: - base = "https://nominatim.openstreetmap.org/reverse" - params = { - "lat": lat, - "lon": lon, - "format": "jsonv2", - "accept-language": lang, - } - headers = {"User-Agent": UA + " Weather.py/1.0"} resp = SESSION.get(base, params=params, headers=headers, timeout=TIMEOUT) resp.raise_for_status() data = resp.json() - address = data.get("address", {}) - name = data.get("name") or address.get("city") or address.get("town") or address.get("village") or address.get("hamlet") - admin1 = address.get("state") - country = address.get("country") - parts = [part for part in [name, admin1, country] if part] + data_dict = ensure_dict(data) + parts = extract_place_parts_nominatim(data_dict) if parts: return ", ".join(parts) except Exception as e: - log_debug(f"Reverse geocoding (Nominatim) failed: {e}") + log_debug(f"Reverse geocoding failed: {e}") + return None - # 2) Open-Meteo reverse (fallback) + +def reverse_geocode_open_meteo(lat: float, lon: float, lang: str) -> Optional[str]: try: base = "https://geocoding-api.open-meteo.com/v1/reverse" - params = { + params: Dict[str, Union[str, float]] = { "latitude": lat, "longitude": lon, "language": lang, @@ -342,48 +462,117 @@ def fetch_place(lat: float, lon: float) -> Optional[str]: resp = SESSION.get(base, params=params, timeout=TIMEOUT) resp.raise_for_status() data = resp.json() - results = data.get("results") or [] + data_dict = ensure_dict(data) + results = ensure_list(data_dict.get("results")) if results: - p = results[0] - name = p.get("name") - admin1 = p.get("admin1") - country = p.get("country") - parts = [part for part in [name, admin1, country] if part] + p = ensure_dict(results[0]) + parts = extract_place_parts_open_meteo(p) if parts: return ", ".join(parts) except Exception as e: log_debug(f"Reverse geocoding (Open-Meteo) failed: {e}") - return None +def fetch_place(lat: float, lon: float) -> Optional[str]: + """Reverse geocode lat/lon to an approximate place. Tries Nominatim first, then Open-Meteo.""" + lang = os.getenv("WEATHER_LANG", "en") + + # 1) Nominatim (OpenStreetMap) + base = "https://nominatim.openstreetmap.org/reverse" + params: Dict[str, Union[str, float]] = { + "lat": lat, + "lon": lon, + "format": "jsonv2", + "accept-language": lang, + } + headers = {"User-Agent": UA + " Weather.py/1.0"} + place = reverse_geocode(base, params, headers) + if place: + return place + + # 2) Open-Meteo reverse (fallback) + return reverse_geocode_open_meteo(lat, lon, lang) + + # =============== Build Output =============== -def safe_get(dct: Dict[str, Any], *keys, default=None): - cur: Any = dct - for k in keys: +_T = TypeVar("_T") + +JSONValue = Union[str, int, float, bool, None, "JSONDict", "JSONList"] +JSONDict = Dict[str, JSONValue] +JSONList = List[JSONValue] + + +def ensure_dict(value: Any) -> JSONDict: + """Return a JSON-like dict when the incoming value looks like one.""" + if isinstance(value, dict): + return cast(JSONDict, value) + # Warn about unexpected type to catch API shape mismatches + val_repr = repr(value) if value is not None else "None" + if len(val_repr) > 100: + val_repr = val_repr[:100] + "..." + print(f"Warning: ensure_dict received {type(value).__name__} instead of dict: {val_repr}", file=sys.stderr) + return cast(JSONDict, {}) + + +def ensure_list(value: Any) -> JSONList: + """Return a JSON-like list when the incoming value looks like one.""" + if isinstance(value, list): + return cast(JSONList, value) + # Warn about unexpected type to catch API shape mismatches + val_repr = repr(value) if value is not None else "None" + if len(val_repr) > 100: + val_repr = val_repr[:100] + "..." + print(f"Warning: ensure_list received {type(value).__name__} instead of list: {val_repr}", file=sys.stderr) + return cast(JSONList, []) + + +def safe_get( + obj: JSONValue | None, + *keys: Union[str, int], + default: _T | None = None, +) -> _T | JSONValue | None: + """Safely traverse nested dict/list structures. + + Keys may be strings (for mapping lookups) or ints (for list indices). + Returns ``default`` if any lookup fails. + """ + + cur: JSONValue | None = obj + for key in keys: if isinstance(cur, dict): - if k not in cur: + if not isinstance(key, str) or key not in cur: return default - cur = cur[k] + cur = cur[key] elif isinstance(cur, list): - try: - cur = cur[k] # type: ignore[index] - except Exception: + if not isinstance(key, int) or key < 0 or key >= len(cur): return default + cur = cur[key] else: return default - return cur + return cast(_T | JSONValue | None, cur) -def build_hourly_precip(forecast: Dict[str, Any]) -> str: +def get_precipitation_probabilities(forecast: JSONDict) -> List[Optional[float]]: + probs_raw = safe_get(forecast, "hourly", "precipitation_probability") + probs_raw_list = ensure_list(probs_raw) + return [coerce_float(p) if p is not None else None for p in probs_raw_list] + + +def find_current_index(times: List[str], cur_time: Optional[str]) -> int: + if cur_time is not None and cur_time in times: + return times.index(cur_time) + return 0 + + +def build_hourly_precip(forecast: JSONDict) -> str: try: - times: List[str] = safe_get(forecast, "hourly", "time", default=[]) or [] - probs: List[Optional[float]] = safe_get( - forecast, "hourly", "precipitation_probability", default=[] - ) or [] - cur_time: Optional[str] = safe_get(forecast, "current", "time") - idx = times.index(cur_time) if cur_time in times else 0 + times_raw = safe_get(forecast, "hourly", "time") + times: List[str] = cast(List[str], ensure_list(times_raw)) + probs = get_precipitation_probabilities(forecast) + cur_time: Optional[str] = cast(Optional[str], safe_get(forecast, "current", "time")) + idx = find_current_index(times, cur_time) window = probs[idx : idx + 6] if not window: return "" @@ -393,152 +582,295 @@ def build_hourly_precip(forecast: Dict[str, Any]) -> str: return "" -def build_output(lat: float, lon: float, forecast: Dict[str, Any], aqi: Optional[Dict[str, Any]], place: Optional[str] = None) -> Tuple[Dict[str, Any], str]: - cur = forecast.get("current", {}) - cur_units = forecast.get("current_units", {}) - daily = forecast.get("daily", {}) - daily_units = forecast.get("daily_units", {}) - - temp_val = cur.get("temperature_2m") - temp_unit = cur_units.get("temperature_2m", "") - temp_str = f"{int(round(temp_val))}{temp_unit}" if isinstance(temp_val, (int, float)) else "N/A" +def build_weather_strings(cur: JSONDict, cur_units: JSONDict, daily: JSONDict, daily_units: JSONDict, temp_unit: str) -> Tuple[str, str, int, int, str, str, str]: + temp_val = coerce_float(cur.get("temperature_2m")) + temp_unit_str = cast(str, cur_units.get("temperature_2m", "")) + temp_str = f"{int(round(temp_val))}{temp_unit_str}" if temp_val is not None else "N/A" - feels_val = cur.get("apparent_temperature") - feels_unit = cur_units.get("apparent_temperature", "") - feels_str = f"Feels like {int(round(feels_val))}{feels_unit}" if isinstance(feels_val, (int, float)) else "" + feels_val = coerce_float(cur.get("apparent_temperature")) + feels_unit = cast(str, cur_units.get("apparent_temperature", "")) + feels_str = f"Feels like {int(round(feels_val))}{feels_unit}" if feels_val is not None else "" - is_day = int(cur.get("is_day", 1) or 1) - code = int(cur.get("weather_code", -1) or -1) + is_day_val = cur.get("is_day") + is_day = coerce_int(is_day_val) or 1 + weather_code_val = cur.get("weather_code") + code = coerce_int(weather_code_val) or -1 icon = wmo_to_icon(code, is_day) status = wmo_to_status(code) - # min/max today (index 0) - tmin_val = safe_get(daily, "temperature_2m_min", 0) - tmax_val = safe_get(daily, "temperature_2m_max", 0) - dtemp_unit = daily_units.get("temperature_2m_min", temp_unit) - tmin_str = f"{int(round(tmin_val))}{dtemp_unit}" if isinstance(tmin_val, (int, float)) else "" - tmax_str = f"{int(round(tmax_val))}{dtemp_unit}" if isinstance(tmax_val, (int, float)) else "" + tmin_val = coerce_float(safe_get(daily, "temperature_2m_min", 0)) + tmax_val = coerce_float(safe_get(daily, "temperature_2m_max", 0)) + dtemp_unit = cast(str, daily_units.get("temperature_2m_min", temp_unit)) + tmin_str = f"{int(round(tmin_val))}{dtemp_unit}" if tmin_val is not None else "" + tmax_str = f"{int(round(tmax_val))}{dtemp_unit}" if tmax_val is not None else "" min_max = f"ī {tmin_str}\t\tī {tmax_str}" if tmin_str and tmax_str else "" - wind_val = cur.get("wind_speed_10m") - wind_unit = cur_units.get("wind_speed_10m", "") - wind_text = f"îž {int(round(wind_val))}{wind_unit}" if isinstance(wind_val, (int, float)) else "" + return temp_str, feels_str, is_day, code, icon, status, min_max - hum_val = cur.get("relative_humidity_2m") - humidity_text = f"îŗ {int(hum_val)}%" if isinstance(hum_val, (int, float)) else "" - vis_val = cur.get("visibility") - visibility_text = f"īŽ {format_visibility(vis_val)}" if isinstance(vis_val, (int, float)) else "" +def build_weather_details(cur: JSONDict, cur_units: JSONDict) -> Tuple[str, str, str]: + wind_val_raw = cur.get("wind_speed_10m") + wind_val = coerce_float(wind_val_raw) + wind_unit = cast(str, cur_units.get("wind_speed_10m", "")) + if wind_val is None: + log_debug(f"Unexpected type for wind_speed_10m: {type(wind_val_raw)}") + wind_text = f" {int(round(wind_val))}{wind_unit}" if wind_val is not None else "" - aqi_val = safe_get(aqi or {}, "current", "european_aqi") - aqi_text = f"AQI {int(aqi_val)}" if isinstance(aqi_val, (int, float)) else "AQI N/A" + hum_val_raw = cur.get("relative_humidity_2m") + hum_val = coerce_float(hum_val_raw) + if hum_val is None: + log_debug(f"Unexpected type for relative_humidity_2m: {type(hum_val_raw)}") + humidity_text = f" {int(hum_val)}%" if hum_val is not None else "" - hourly_precip = build_hourly_precip(forecast) - prediction = f"\n\n{hourly_precip}" if hourly_precip else "" + vis_val_raw = cur.get("visibility") + vis_val = coerce_float(vis_val_raw) + if vis_val is None: + log_debug(f"Unexpected type for visibility: {type(vis_val_raw)}") + visibility_text = f" {format_visibility(vis_val)}" if vis_val is not None else "" + + return wind_text, humidity_text, visibility_text + + +def build_aqi_info(aqi: Optional[Dict[str, Any]]) -> str: + aqi_dict = ensure_dict(aqi) + aqi_val_raw = safe_get(aqi_dict, "current", "european_aqi") + aqi_val = coerce_float(aqi_val_raw) + if aqi_val is None: + log_debug(f"Unexpected type for european_aqi: {type(aqi_val_raw)}") + return f"AQI {int(aqi_val)}" if aqi_val is not None else "AQI N/A" + + +def build_place_str(lat: float, lon: float, place: Optional[str]) -> str: + return MANUAL_PLACE or ENV_PLACE or place or f"{lat:.3f}, {lon:.3f}" + + + + +class TooltipParams(NamedTuple): + temp_str: str + icon: str + status: str + location_text: str + feels_str: str + min_max: str + wind_text: str + humidity_text: str + visibility_text: str + aqi_text: str + hourly_precip: str + + +def build_tooltip_markup(params: TooltipParams) -> str: + return str.format( + "\t\t{}\t\t\n{}\n{}\n{}\n{}\n\n{}\n{}\n{}{}", + f'<span size="xx-large">{esc(params.temp_str)}</span>', + f"<big> {params.icon}</big>", + f"<b>{esc(params.status)}</b>", + esc(params.location_text), + f"<small>{esc(params.feels_str)}</small>" if params.feels_str else "", + f"<b>{esc(params.min_max)}</b>" if params.min_max else "", + f"{esc(params.wind_text)}\t{esc(params.humidity_text)}", + f"{esc(params.visibility_text)}\t{esc(params.aqi_text)}", + f"<i> {esc(params.hourly_precip)}</i>" if params.hourly_precip else "", + ) - # Build place string (priority: MANUAL_PLACE > ENV_PLACE > reverse geocode > lat,lon) - place_str = (MANUAL_PLACE or ENV_PLACE or place or f"{lat:.3f}, {lon:.3f}") - location_text = f"{LOC_ICON} {place_str}" - # Build tooltip (markup or plain) +def build_tooltip_plain(params: TooltipParams) -> str: + lines = [ + f"{params.icon} {params.temp_str}", + params.status, + params.location_text, + ] + if params.feels_str: + lines.append(params.feels_str) + if params.min_max: + lines.append(params.min_max) + combined_wind = f"{params.wind_text} {params.humidity_text}".strip() + if combined_wind: + lines.append(combined_wind) + combined_visibility = f"{params.visibility_text} {params.aqi_text}".strip() + if combined_visibility: + lines.append(combined_visibility) + if params.hourly_precip: + lines.append(params.hourly_precip) + return "\n".join([ln for ln in lines if ln]) + + +def build_tooltip_text(params: TooltipParams) -> str: if TOOLTIP_MARKUP: - # Escape dynamic text to avoid breaking Pango markup - tooltip_text = str.format( - "\t\t{}\t\t\n{}\n{}\n{}\n{}\n\n{}\n{}\n{}{}", - f'<span size="xx-large">{esc(temp_str)}</span>', - f"<big> {icon}</big>", - f"<b>{esc(status)}</b>", - esc(location_text), - f"<small>{esc(feels_str)}</small>" if feels_str else "", - f"<b>{esc(min_max)}</b>" if min_max else "", - f"{esc(wind_text)}\t{esc(humidity_text)}", - f"{esc(visibility_text)}\t{esc(aqi_text)}", - f"<i> {esc(prediction)}</i>" if prediction else "", - ) + return build_tooltip_markup(params) else: - lines = [ - f"{icon} {temp_str}", - status, - location_text, - ] - if feels_str: - lines.append(feels_str) - if min_max: - lines.append(min_max) - lines.append(f"{wind_text} {humidity_text}".strip()) - lines.append(f"{visibility_text} {aqi_text}".strip()) - if prediction: - lines.append(hourly_precip) - tooltip_text = "\n".join([ln for ln in lines if ln]) + return build_tooltip_plain(params) + + +def gather_weather_data(forecast: Optional[Dict[str, Any]], aqi: Optional[Dict[str, Any]]) -> WeatherData: + forecast_dict = ensure_dict(forecast) + cur = ensure_dict(forecast_dict.get("current")) + cur_units = ensure_dict(forecast_dict.get("current_units")) + daily = ensure_dict(forecast_dict.get("daily")) + daily_units = ensure_dict(forecast_dict.get("daily_units")) + + temp_str, feels_str, is_day, code, icon, status, min_max = build_weather_strings(cur, cur_units, daily, daily_units, cast(str, cur_units.get("temperature_2m", ""))) + wind_text, humidity_text, visibility_text = build_weather_details(cur, cur_units) + aqi_text = build_aqi_info(aqi) + hourly_precip = build_hourly_precip(forecast_dict) + + return WeatherData( + temp_str=temp_str, + feels_str=feels_str, + icon=icon, + status=status, + min_max=min_max, + wind_text=wind_text, + humidity_text=humidity_text, + visibility_text=visibility_text, + aqi_text=aqi_text, + hourly_precip=hourly_precip, + is_day=is_day, + code=code, + ) + - out_data = { - "text": f"{icon} {temp_str}", - "alt": status, +def build_output(loc: Location, forecast: Optional[Dict[str, Any]], aqi: Optional[Dict[str, Any]]) -> Tuple[Dict[str, str], str]: + data = gather_weather_data(forecast, aqi) + + place_str = build_place_str(loc.lat, loc.lon, loc.place) + location_text = f"{LOC_ICON} {place_str}" + + tooltip_text = build_tooltip_text( + TooltipParams( + data.temp_str, data.icon, data.status, location_text, data.feels_str, data.min_max, + data.wind_text, data.humidity_text, data.visibility_text, data.aqi_text, data.hourly_precip + ) + ) + + out_data: Dict[str, Any] = { + "text": f"{data.icon} {data.temp_str}", + "alt": data.status, "tooltip": tooltip_text, - "class": f"wmo-{code} {'day' if is_day else 'night'}", + "class": f"wmo-{data.code} {'day' if data.is_day else 'night'}", } simple_weather = ( - f"{icon} {status}\n" - + f"ī {temp_str} ({feels_str})\n" - + (f"{wind_text} \n" if wind_text else "") - + (f"{humidity_text} \n" if humidity_text else "") - + f"{visibility_text} {aqi_text}\n" + f"{data.icon} {data.status}\n" + + f"ī {data.temp_str} ({data.feels_str})\n" + + (f"{data.wind_text} \n" if data.wind_text else "") + + (f"{data.humidity_text} \n" if data.humidity_text else "") + + f"{data.visibility_text} {data.aqi_text}\n" ) return out_data, simple_weather -def main() -> None: - lat, lon = get_coords() - - # Try cache first +def try_cached_weather(lat: float, lon: float) -> Optional[Tuple[Dict[str, str], str]]: cached = read_api_cache() - if cached and isinstance(cached, dict): - forecast = cached.get("forecast") - aqi = cached.get("aqi") - cached_place = cached.get("place") if isinstance(cached.get("place"), str) else None + if cached: + forecast = cast(Optional[Dict[str, Any]], cached.get("forecast")) + aqi = cast(Optional[Dict[str, Any]], cached.get("aqi")) + place_val = cached.get("place") + cached_place = place_val if isinstance(place_val, str) else None place_effective = MANUAL_PLACE or ENV_PLACE or cached_place try: - out, simple = build_output(lat, lon, forecast, aqi, place_effective) - print(json.dumps(out, ensure_ascii=False)) - write_simple_text_cache(simple) - return + return build_output(Location(lat, lon, place_effective), forecast, aqi) except Exception as e: print(f"Cached data build failed, refetching: {e}", file=sys.stderr) + return None - # Fetch fresh + +def fetch_fresh_weather(lat: float, lon: float) -> Optional[Tuple[Dict[str, str], str]]: try: forecast = fetch_open_meteo(lat, lon) aqi = fetch_aqi(lat, lon) - # Use manual/env place if provided; otherwise reverse geocode place_effective = MANUAL_PLACE or ENV_PLACE or fetch_place(lat, lon) write_api_cache({"forecast": forecast, "aqi": aqi, "place": place_effective}) - out, simple = build_output(lat, lon, forecast, aqi, place_effective) - print(json.dumps(out, ensure_ascii=False)) - write_simple_text_cache(simple) + return build_output(Location(lat, lon, place_effective), forecast, aqi) except Exception as e: print(f"Open-Meteo fetch failed: {e}", file=sys.stderr) - # Last resort: try stale cache without TTL - try: - if os.path.exists(API_CACHE_PATH): - with open(API_CACHE_PATH, "r", encoding="utf-8") as f: - stale = json.load(f) - out, simple = build_output(lat, lon, stale.get("forecast", {}), stale.get("aqi"), stale.get("place") if isinstance(stale.get("place"), str) else None) - print(json.dumps(out, ensure_ascii=False)) - write_simple_text_cache(simple) - return - except Exception as e2: - print(f"Failed to use stale cache: {e2}", file=sys.stderr) - # Fallback minimal output - fallback = { - "text": f"{WEATHER_ICONS['default']} N/A", - "alt": "Unavailable", - "tooltip": "Weather unavailable", - "class": "unavailable", - } - print(json.dumps(fallback, ensure_ascii=False)) + return None + + +def try_stale_weather(lat: float, lon: float) -> Optional[Tuple[Dict[str, str], str]]: + try: + if API_CACHE_PATH.exists(): + with API_CACHE_PATH.open("r", encoding="utf-8") as f: + stale = json.load(f) + stale_dict = ensure_dict(stale) + place_val = stale_dict.get("place") + place = place_val if isinstance(place_val, str) else None + forecast = cast(Optional[Dict[str, Any]], stale_dict.get("forecast")) + aqi = cast(Optional[Dict[str, Any]], stale_dict.get("aqi")) + return build_output(Location(lat, lon, place), forecast, aqi) + except Exception as e2: + print(f"Failed to use stale cache: {e2}", file=sys.stderr) + return None + + +def main() -> None: + lat, lon = get_coords() + + # Try cache first + result = try_cached_weather(lat, lon) + if result: + out, simple = result + print(json.dumps(out, ensure_ascii=False)) + write_simple_text_cache(simple) + return + + # Fetch fresh + result = fetch_fresh_weather(lat, lon) + if result: + out, simple = result + print(json.dumps(out, ensure_ascii=False)) + write_simple_text_cache(simple) + return + + # Last resort: try stale cache + result = try_stale_weather(lat, lon) + if result: + out, simple = result + print(json.dumps(out, ensure_ascii=False)) + write_simple_text_cache(simple) + return + + # Fallback minimal output + fallback = { + "text": f"{WEATHER_ICONS['default']} N/A", + "alt": "Unavailable", + "tooltip": "Weather unavailable", + "class": "unavailable", + } + print(json.dumps(fallback, ensure_ascii=False)) + + +def test_coerce_functions(): + """Manual testing for coerce functions.""" + # Test coerce_int + assert coerce_int(5) == 5 + assert coerce_int(5.5) == 5 + assert coerce_int("5") == 5 + assert coerce_int("5.7") == 5 + assert coerce_int("abc") is None + assert coerce_int(None) is None + + # Test coerce_float + assert coerce_float(5.5) == 5.5 + assert coerce_float(5) == 5.0 + assert coerce_float("5.5") == 5.5 + assert coerce_float("abc") is None + assert coerce_float(None) is None + + # Test coerce_number + assert coerce_number(5) == 5 + assert coerce_number(5.5) == 5.5 + assert coerce_number("5") == 5 + assert coerce_number("5.5") == 5.5 + assert coerce_number("abc") is None + + print("All coerce function tests passed.", file=sys.stderr) if __name__ == "__main__": - main() + if len(sys.argv) > 1 and sys.argv[1] == "--test": + test_coerce_functions() + else: + main() |
