Files
congress-legislators/scripts/office_validator.py
Joshua Tauberer d1bf22b275 Fix validation to not crash if a legislator doesn't have a bioguide ID
This can happen when new legislators are added before a bioguide ID is published by the House, especially when staging election results before the next Congress has begun.
2022-12-25 08:37:34 -05:00

226 lines
6.9 KiB
Python

"""
Run validation tests on district office data.
For each legislator:
has offices
For each office:
Required fields: id, city, state
Expected fields: address, city, state, zip, phone, latitude, longitude, id
Optional fields: building, fax, hours, suite
Office id: check consistent
offices are in legislator's state
Globally:
Every legislator has offices
All offices belong to current legislators
"""
import datetime
import os.path
import re
from collections import OrderedDict, defaultdict
from itertools import count
import sys
try:
import rtyaml as yaml
except ImportError:
import yaml
try:
from termcolor import colored
except ImportError:
colored = None
NONALPHA = re.compile(r"\W")
PHONE = re.compile(r"^\d{3}-\d{3}-\d{4}$")
FIELD_ORDER = """
id
address suite building
city state zip
latitude longitude
fax hours phone
""".split()
def relfile(path):
return os.path.abspath(os.path.join(os.path.dirname(__file__), path))
def id_offices(bioguide_id, offices):
"""
Generate unique office ids using a similar algorithm to
https://github.com/controlshift/congress-legislators/blob/add-ids-to-offices-script/add_ids_to_offices.rb
Used for validation here, but could be used to generate ids.
"""
id_count = defaultdict(count)
for office in offices:
locality = office.get('city', 'no_city').lower()
locality = NONALPHA.sub('_', locality)
office_id = '-'.join([bioguide_id, locality])
city_count = next(id_count[office_id])
if city_count:
office_id = '-'.join([office_id, str(city_count)])
yield office_id, office
def check_legislator_offices(legislator_offices, legislator):
bioguide_id = legislator_offices['id']['bioguide']
offices = legislator_offices.get('offices', [])
state = None
if legislator:
state = legislator['terms'][-1]['state']
required = ['id', 'city', 'state']
expected = ['address', 'zip', 'phone', 'latitude', 'longitude']
optional = ['building', 'suite', 'hours', 'fax']
all_fields = set(required + expected + optional)
errors = []
warnings = []
if not legislator:
errors.append("Offices for inactive legislator")
if not offices:
errors.append("Zero offices")
for office_id, office in id_offices(bioguide_id, offices):
for field in required:
if not office.get(field):
errors.append("Office %s is missing required field '%s'" % (office_id, field))
for field in expected:
if not office.get(field):
warnings.append("Office %s is missing field '%s'" % (office_id, field))
for field in office:
if field not in all_fields:
errors.append("Office %s has unrecognized field '%s'" % (office_id, field))
if not office.get(field):
warnings.append("Office %s has empty field %s" % (office_id, field))
found_id = office.get('id')
if found_id and office_id != found_id:
errors.append("Office %s has unexpected id '%s'" % (office_id, found_id))
office_state = office.get('state')
if state and office_state and office_state != state:
errors.append("Office %s is in '%s', legislator is from '%s'" % (office_id, office_state, state))
office_zip = office.get('zip')
if office_zip is not None and not isinstance(office_zip, str):
errors.append("Office %s has non-string zip: %s" % (office_id, office_zip))
phone = office.get('phone')
fax = office.get('fax')
if phone and not PHONE.match(phone):
errors.append("Office %s phone '%s' does not match format ddd-ddd-dddd" % (office_id, phone))
if fax and not PHONE.match(fax):
errors.append("Office %s fax '%s' does not match format ddd-ddd-dddd" % (office_id, fax))
if (office.get('address') and
not (office.get('latitude') and office.get('longitude'))):
warnings.append("Office %s missing geocode" % office_id)
if not office.get('address') and not office.get('phone'):
errors.append("Office %s needs at least address or phone" % office_id)
fields = [f for f in office if f in FIELD_ORDER] # unknown fields checked above
sorted_fields = sorted(fields, key=FIELD_ORDER.index)
if fields != sorted_fields:
warnings.append("Office %s fields out of order, expected %s" % (office_id, sorted_fields))
return errors, warnings
def load_to_dict(path):
# load to an OrderedDict keyed by bioguide id
d = yaml.load(open(relfile(path)))
return OrderedDict((l['id']['bioguide'], l) for l in d
if 'bioguide' in l['id'])
def print_issues(legislator, errors, warnings):
if not (errors or warnings):
return
if isinstance(legislator, str):
info = legislator
else:
term = legislator['terms'][-1]
info = "{} [{} {}] {} ({})".format(
legislator['id']['bioguide'], term['state'], term['type'],
legislator['name'].get('official_full'), term.get('url', 'no url'))
print(info)
for error in errors:
msg = " ERROR: {}".format(error)
if colored:
msg = colored(msg, "red")
print(msg)
for warning in warnings:
msg = " WARNING: {}".format(warning)
if colored:
msg = colored(msg, "yellow")
print(msg)
print("")
def run(skip_warnings=False):
legislators = load_to_dict("../legislators-current.yaml")
legislators_offices = load_to_dict("../legislators-district-offices.yaml")
has_errors = False
for bioguide_id, legislator_offices in legislators_offices.items():
legislator = legislators.get(bioguide_id)
errors, warnings = check_legislator_offices(legislator_offices, legislator)
if skip_warnings:
warnings = []
if errors:
has_errors = True
print_issues(legislator or bioguide_id, errors, warnings)
for bioguide_id in set(legislators) - set(legislators_offices):
# Only report an error for a missing office if the
# legislator has been in office for at least 60 days.
start_date = legislators[bioguide_id]['terms'][-1]['start']
if datetime.date.today() - datetime.datetime.strptime(start_date, '%Y-%m-%d').date() >= datetime.timedelta(60):
has_errors = True
errors, warnings = ["No offices"], []
else:
errors, warnings = [], ["No offices"]
print_issues(legislators[bioguide_id], errors, warnings)
return has_errors
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--skip-warnings", action="store_true")
args = parser.parse_args()
has_errors = run(skip_warnings=args.skip_warnings)
sys.exit(1 if has_errors else 0)