radian_to_km = 6372.795484
radian_to_mi = 3959.871528
-def pyversion(ref=None):
- """Determine the Python version and optionally compare to a reference."""
- import platform
- ver = platform.python_version()
- if ref:
- return [
- int(x) for x in ver.split(".")[:2]
- ] >= [
- int(x) for x in ref.split(".")[:2]
- ]
- else: return ver
-
class Selections:
"""An object to contain selection data."""
def __init__(self):
cachedir="."
):
"""Return a string containing the results of a URI GET."""
- if pyversion("3"):
- import urllib, urllib.error, urllib.request
- URLError = urllib.error.URLError
- urlopen = urllib.request.urlopen
- else:
- import urllib2 as urllib
- URLError = urllib.URLError
- urlopen = urllib.urlopen
- import os, time
+ import os, time, urllib, urllib.error, urllib.request
if cache_data:
dcachedir = os.path.join( os.path.expanduser(cachedir), "datacache" )
if not os.path.exists(dcachedir):
dcache_fd.close()
else:
try:
- data = urlopen(uri).read().decode("utf-8")
- except URLError:
+ data = urllib.request.urlopen(uri).read().decode("utf-8")
+ except urllib.error.URLError:
if ignore_fail: return ""
import os, sys
sys.stderr.write("%s error: failed to retrieve\n %s\n\n" % (
cacheage=cacheage,
cachedir=cachedir
)
- if pyversion("3") and type(metar) is bytes: metar = metar.decode("utf-8")
+ if type(metar) is bytes: metar = metar.decode("utf-8")
if verbose: return metar
else:
import re
quiet=False,
cache_data=False,
cacheage=900,
- cachedir="."
+ cachedir=".",
+ delay=1
):
"""Return alert notice for the specified URI."""
if not uri:
cacheage=cacheage,
cachedir=cachedir
).strip()
- if pyversion("3") and type(alert) is bytes: alert = alert.decode("utf-8")
+ if type(alert) is bytes: alert = alert.decode("utf-8")
if alert:
if verbose: return alert
else:
- if alert.find("\nNATIONAL WEATHER SERVICE") == -1:
- muted = False
- else:
+ import re
+ if re.search(r"\nNational Weather Service", alert):
muted = True
+ else:
+ muted = False
+ expirycheck = re.search(r"Expires:([0-9]{12})", alert)
+ if expirycheck:
+ # only report alerts and forecasts that expired less than delay
+ # hours ago
+ import datetime, zoneinfo
+ expiration = datetime.datetime.fromisoformat(
+ "%sT%sZ" % (expirycheck[1][:8], expirycheck[1][-4:]))
+ now = datetime.datetime.now(tz=zoneinfo.ZoneInfo("UTC"))
+ if now - expiration > datetime.timedelta(hours=delay):
+ return ""
lines = alert.split("\n")
- import time
- # TODO: make this offset configurable
- # TODO: adjust offset relative to the difference between the user's
- # local time and the zone's local time (will need to extend
- # the schema in the zones file to store each tz
- offset = 86400 # one day
-
- # report alerts and forecasts that expired less than offset ago;
- # this is a cheap hack since expiration times seem to be relative
- # to the zone's local timezone, and converting from the user's
- # would get complicated, but also there can sometimes be a lag
- # between expiration and the next update
- valid_time = time.strftime(
- "%Y%m%d%H%M", time.localtime(time.time() - offset))
output = []
for line in lines:
- if line.startswith("Expires:") \
- and "Expires:" + valid_time > line:
- return ""
- if muted and line.startswith("NATIONAL WEATHER SERVICE"):
+ if muted and line.startswith("National Weather Service"):
muted = False
line = ""
elif line == "&&":
default=default_cachedir,
help="directory for storing cached searches and data")
+ # the --delay option
+ if config.has_option("default", "delay"):
+ default_delay = config.getint("default", "delay")
+ else: default_delay = 1
+ option_parser.add_option("--delay",
+ dest="delay",
+ default=default_delay,
+ help="hours to delay alert and forecast expiration")
+
# the -f/--forecast option
if config.has_option("default", "forecast"):
default_forecast = config.getboolean("default", "forecast")
def get_config():
"""Parse the aliases and configuration."""
- if pyversion("3"): import configparser
- else: import ConfigParser as configparser
+ import configparser, os
config = configparser.ConfigParser()
- import os
rcfiles = [
"/etc/weatherrc",
"/etc/weather/weatherrc",
]
for rcfile in rcfiles:
if os.access(rcfile, os.R_OK):
- if pyversion("3"):
- config.read(rcfile, encoding="utf-8")
- else:
- config.read(rcfile)
+ config.read(rcfile, encoding="utf-8")
for section in config.sections():
if section != section.lower():
if config.has_section(section.lower()):
def integrate_search_cache(config, cachedir, setpath):
"""Add cached search results into the configuration."""
- if pyversion("3"): import configparser
- else: import ConfigParser as configparser
- import os, time
+ import configparser, os, time
scache_fn = os.path.join( os.path.expanduser(cachedir), "searches" )
if not os.access(scache_fn, os.R_OK): return config
scache_fd = open(scache_fn)
pass
return config
scache = configparser.ConfigParser()
- if pyversion("3"):
- scache.read(scache_fn, encoding="utf-8")
- else:
- scache.read(scache_fn)
+ scache.read(scache_fn, encoding="utf-8")
for section in scache.sections():
if not config.has_section(section):
config.add_section(section)
quiet=False
):
"""Find URIs using airport, gecos, placename, station, ZCTA/ZIP, zone."""
- import codecs, datetime, time, os, re, sys
- if pyversion("3"): import configparser
- else: import ConfigParser as configparser
+ import codecs, configparser, datetime, time, os, re, sys
datafiles = data_index(path)
if re.match("[A-Za-z]{3}$", expression): searchtype = "airport"
elif re.match("[A-Za-z0-9]{4}$", expression): searchtype = "station"
datafile = datafiles[dataname][0]
if datafile.endswith(".gz"):
import gzip
- if pyversion("3"):
- stations.read_string(
- gzip.open(datafile).read().decode("utf-8") )
- else: stations.read_file( gzip.open(datafile) )
+ stations.read_string( gzip.open(datafile).read().decode("utf-8") )
else:
- if pyversion("3"):
- stations.read(datafile, encoding="utf-8")
- else:
- stations.read(datafile)
+ stations.read(datafile, encoding="utf-8")
else:
message = "%s error: can't find \"%s\" data file\n" % (
os.path.basename( sys.argv[0] ),
datafile = datafiles[dataname][0]
if datafile.endswith(".gz"):
import gzip
- if pyversion("3"):
- zones.read_string( gzip.open(datafile).read().decode("utf-8") )
- else: zones.read_file( gzip.open(datafile) )
+ zones.read_string( gzip.open(datafile).read().decode("utf-8") )
else:
- if pyversion("3"):
- zones.read(datafile, encoding="utf-8")
- else:
- zones.read(datafile)
+ zones.read(datafile, encoding="utf-8")
else:
message = "%s error: can't find \"%s\" data file\n" % (
os.path.basename( sys.argv[0] ),
datafile = datafiles[dataname][0]
if datafile.endswith(".gz"):
import gzip
- if pyversion("3"):
- airports.read_string(
- gzip.open(datafile).read().decode("utf-8") )
- else: airports.read_file( gzip.open(datafile) )
+ airports.read_string(
+ gzip.open(datafile).read().decode("utf-8") )
else:
- if pyversion("3"):
- airports.read(datafile, encoding="utf-8")
- else:
- airports.read(datafile)
+ airports.read(datafile, encoding="utf-8")
else:
message = "%s error: can't find \"%s\" data file\n" % (
os.path.basename( sys.argv[0] ),
datafile = datafiles[dataname][0]
if datafile.endswith(".gz"):
import gzip
- if pyversion("3"):
- zctas.read_string(
- gzip.open(datafile).read().decode("utf-8") )
- else: zctas.read_file( gzip.open(datafile) )
+ zctas.read_string( gzip.open(datafile).read().decode("utf-8") )
else:
- if pyversion("3"):
- zctas.read(datafile, encoding="utf-8")
- else:
- zctas.read(datafile)
+ zctas.read(datafile, encoding="utf-8")
else:
message = "%s error: can't find \"%s\" data file\n" % (
os.path.basename( sys.argv[0] ),
datafile = datafiles[dataname][0]
if datafile.endswith(".gz"):
import gzip
- if pyversion("3"):
- places.read_string(
- gzip.open(datafile).read().decode("utf-8") )
- else: places.read_file( gzip.open(datafile) )
+ places.read_string( gzip.open(datafile).read().decode("utf-8") )
else:
- if pyversion("3"):
- places.read(datafile, encoding="utf-8")
- else:
- places.read(datafile)
+ places.read(datafile, encoding="utf-8")
else:
message = "%s error: can't find \"%s\" data file\n" % (
os.path.basename( sys.argv[0] ),
)
try:
scache_existing = configparser.ConfigParser()
- if pyversion("3"):
- scache_existing.read(scache_fn, encoding="utf-8")
- else:
- scache_existing.read(scache_fn)
+ scache_existing.read(scache_fn, encoding="utf-8")
if not scache_existing.has_section(search[0]):
scache_fd = codecs.open(scache_fn, "a", "utf-8")
scache_fd.writelines(search_cache)
return tuple(coordinates)
def correlate():
- import codecs, csv, datetime, hashlib, os, re, sys, time, zipfile
- if pyversion("3"): import configparser
- else: import ConfigParser as configparser
+ import codecs, configparser, csv, datetime, hashlib, os, re, sys, time
+ import zipfile, zoneinfo
for filename in os.listdir("."):
if re.match("[0-9]{4}_Gaz_counties_national.zip$", filename):
gcounties_an = filename
zones[zone]["zone_forecast"] = (
"https://tgftp.nws.noaa.gov/data/forecasts/zone/"
"%s/%s.txt" % (state.lower(), zone))
+ tzcode = fields[7]
+ if tzcode == "A":
+ zones[zone]["tz"] = "US/Alaska"
+ elif tzcode == "AH":
+ zones[zone]["tz"] = "US/Aleutian"
+ elif tzcode in ("C", "CE", "CM"):
+ zones[zone]["tz"] = "US/Central"
+ elif tzcode in ("E", "e"):
+ zones[zone]["tz"] = "US/Eastern"
+ elif tzcode == "F":
+ zones[zone]["tz"] = "Pacific/Guadalcanal"
+ elif tzcode == "G":
+ zones[zone]["tz"] = "Pacific/Guam"
+ elif tzcode == "H":
+ zones[zone]["tz"] = "US/Hawaii"
+ elif tzcode == "J":
+ zones[zone]["tz"] = "Japan"
+ elif tzcode == "K":
+ zones[zone]["tz"] = "Pacific/Kwajalein"
+ elif tzcode in ("M", "MC", "MP"):
+ zones[zone]["tz"] = "US/Mountain"
+ elif tzcode == "m":
+ zones[zone]["tz"] = "US/Arizona"
+ elif tzcode == "P":
+ zones[zone]["tz"] = "US/Pacific"
+ elif tzcode == "S":
+ zones[zone]["tz"] = "US/Samoa"
+ elif tzcode == "V":
+ zones[zone]["tz"] = "America/Virgin"
+ else:
+ zones[zone]["tz"] = ""
county = fields[5]
if county:
if description.endswith(county):
sys.stdout.write(message)
sys.stdout.flush()
airports = configparser.ConfigParser()
- if pyversion("3"):
- airports.read(airports_fn, encoding="utf-8")
- else:
- airports.read(airports_fn)
+ airports.read(airports_fn, encoding="utf-8")
places = configparser.ConfigParser()
- if pyversion("3"):
- places.read(places_fn, encoding="utf-8")
- else:
- places.read(places_fn)
+ places.read(places_fn, encoding="utf-8")
stations = configparser.ConfigParser()
- if pyversion("3"):
- stations.read(stations_fn, encoding="utf-8")
- else:
- stations.read(stations_fn)
+ stations.read(stations_fn, encoding="utf-8")
zctas = configparser.ConfigParser()
- if pyversion("3"):
- zctas.read(zctas_fn, encoding="utf-8")
- else:
- zctas.read(zctas_fn)
+ zctas.read(zctas_fn, encoding="utf-8")
zones = configparser.ConfigParser()
- if pyversion("3"):
- zones.read(zones_fn, encoding="utf-8")
- else:
- zones.read(zones_fn)
+ zones.read(zones_fn, encoding="utf-8")
qalog = []
places_nocentroid = 0
places_nodescription = 0
zctas_nocentroid += 1
zones_nocentroid = 0
zones_nodescription = 0
+ zones_notz = 0
zones_noforecast = 0
zones_overlapping = 0
zonetable = {}
if not zones.has_option(zone, "description"):
qalog.append("%s: no description\n" % zone)
zones_nodescription += 1
+ if not zones.has_option(zone, "tz") or not zones.get(
+ zone, "tz") in zoneinfo.available_timezones():
+ qalog.append("%s: no time zone\n" % zone)
+ zones_notz += 1
if not zones.has_option(zone, "zone_forecast"):
qalog.append("%s: no forecast\n" % zone)
zones_noforecast += 1
print(" %s zones with no centroid"%zones_nocentroid)
if zones_nodescription:
print(" %s zones with no description"%zones_nodescription)
+ if zones_notz:
+ print(" %s zones with no time zone"%zones_notz)
if zones_noforecast:
print(" %s zones with no forecast"%zones_noforecast)
if zones_overlapping: