mirror of
https://github.com/unitedstates/congress-legislators.git
synced 2025-12-19 09:50:37 -05:00
This can happen when new legislators are added before a bioguide ID is published by the House, especially when staging election results before the next Congress has begun.
226 lines
6.9 KiB
Python
226 lines
6.9 KiB
Python
"""
|
|
|
|
Run validation tests on district office data.
|
|
|
|
|
|
For each legislator:
|
|
has offices
|
|
|
|
For each office:
|
|
Required fields: id, city, state
|
|
Expected fields: address, city, state, zip, phone, latitude, longitude, id
|
|
Optional fields: building, fax, hours, suite
|
|
Office id: check consistent
|
|
offices are in legislator's state
|
|
|
|
Globally:
|
|
Every legislator has offices
|
|
All offices belong to current legislators
|
|
|
|
"""
|
|
|
|
import datetime
|
|
import os.path
|
|
import re
|
|
from collections import OrderedDict, defaultdict
|
|
from itertools import count
|
|
import sys
|
|
|
|
try:
|
|
import rtyaml as yaml
|
|
except ImportError:
|
|
import yaml
|
|
|
|
try:
|
|
from termcolor import colored
|
|
except ImportError:
|
|
colored = None
|
|
|
|
|
|
NONALPHA = re.compile(r"\W")
|
|
PHONE = re.compile(r"^\d{3}-\d{3}-\d{4}$")
|
|
FIELD_ORDER = """
|
|
|
|
id
|
|
address suite building
|
|
city state zip
|
|
latitude longitude
|
|
fax hours phone
|
|
|
|
""".split()
|
|
|
|
|
|
def relfile(path):
|
|
return os.path.abspath(os.path.join(os.path.dirname(__file__), path))
|
|
|
|
|
|
def id_offices(bioguide_id, offices):
|
|
"""
|
|
Generate unique office ids using a similar algorithm to
|
|
https://github.com/controlshift/congress-legislators/blob/add-ids-to-offices-script/add_ids_to_offices.rb
|
|
|
|
Used for validation here, but could be used to generate ids.
|
|
"""
|
|
id_count = defaultdict(count)
|
|
for office in offices:
|
|
locality = office.get('city', 'no_city').lower()
|
|
locality = NONALPHA.sub('_', locality)
|
|
|
|
office_id = '-'.join([bioguide_id, locality])
|
|
|
|
city_count = next(id_count[office_id])
|
|
if city_count:
|
|
office_id = '-'.join([office_id, str(city_count)])
|
|
|
|
yield office_id, office
|
|
|
|
|
|
def check_legislator_offices(legislator_offices, legislator):
|
|
bioguide_id = legislator_offices['id']['bioguide']
|
|
offices = legislator_offices.get('offices', [])
|
|
|
|
state = None
|
|
if legislator:
|
|
state = legislator['terms'][-1]['state']
|
|
|
|
required = ['id', 'city', 'state']
|
|
expected = ['address', 'zip', 'phone', 'latitude', 'longitude']
|
|
optional = ['building', 'suite', 'hours', 'fax']
|
|
all_fields = set(required + expected + optional)
|
|
|
|
errors = []
|
|
warnings = []
|
|
|
|
if not legislator:
|
|
errors.append("Offices for inactive legislator")
|
|
|
|
if not offices:
|
|
errors.append("Zero offices")
|
|
|
|
for office_id, office in id_offices(bioguide_id, offices):
|
|
|
|
for field in required:
|
|
if not office.get(field):
|
|
errors.append("Office %s is missing required field '%s'" % (office_id, field))
|
|
|
|
for field in expected:
|
|
if not office.get(field):
|
|
warnings.append("Office %s is missing field '%s'" % (office_id, field))
|
|
|
|
for field in office:
|
|
if field not in all_fields:
|
|
errors.append("Office %s has unrecognized field '%s'" % (office_id, field))
|
|
if not office.get(field):
|
|
warnings.append("Office %s has empty field %s" % (office_id, field))
|
|
|
|
found_id = office.get('id')
|
|
if found_id and office_id != found_id:
|
|
errors.append("Office %s has unexpected id '%s'" % (office_id, found_id))
|
|
|
|
office_state = office.get('state')
|
|
if state and office_state and office_state != state:
|
|
errors.append("Office %s is in '%s', legislator is from '%s'" % (office_id, office_state, state))
|
|
|
|
office_zip = office.get('zip')
|
|
if office_zip is not None and not isinstance(office_zip, str):
|
|
errors.append("Office %s has non-string zip: %s" % (office_id, office_zip))
|
|
|
|
phone = office.get('phone')
|
|
fax = office.get('fax')
|
|
|
|
if phone and not PHONE.match(phone):
|
|
errors.append("Office %s phone '%s' does not match format ddd-ddd-dddd" % (office_id, phone))
|
|
|
|
if fax and not PHONE.match(fax):
|
|
errors.append("Office %s fax '%s' does not match format ddd-ddd-dddd" % (office_id, fax))
|
|
|
|
if (office.get('address') and
|
|
not (office.get('latitude') and office.get('longitude'))):
|
|
warnings.append("Office %s missing geocode" % office_id)
|
|
|
|
if not office.get('address') and not office.get('phone'):
|
|
errors.append("Office %s needs at least address or phone" % office_id)
|
|
|
|
fields = [f for f in office if f in FIELD_ORDER] # unknown fields checked above
|
|
sorted_fields = sorted(fields, key=FIELD_ORDER.index)
|
|
if fields != sorted_fields:
|
|
warnings.append("Office %s fields out of order, expected %s" % (office_id, sorted_fields))
|
|
|
|
return errors, warnings
|
|
|
|
|
|
def load_to_dict(path):
|
|
# load to an OrderedDict keyed by bioguide id
|
|
d = yaml.load(open(relfile(path)))
|
|
return OrderedDict((l['id']['bioguide'], l) for l in d
|
|
if 'bioguide' in l['id'])
|
|
|
|
|
|
def print_issues(legislator, errors, warnings):
|
|
if not (errors or warnings):
|
|
return
|
|
|
|
if isinstance(legislator, str):
|
|
info = legislator
|
|
else:
|
|
term = legislator['terms'][-1]
|
|
info = "{} [{} {}] {} ({})".format(
|
|
legislator['id']['bioguide'], term['state'], term['type'],
|
|
legislator['name'].get('official_full'), term.get('url', 'no url'))
|
|
|
|
print(info)
|
|
|
|
for error in errors:
|
|
msg = " ERROR: {}".format(error)
|
|
if colored:
|
|
msg = colored(msg, "red")
|
|
print(msg)
|
|
for warning in warnings:
|
|
msg = " WARNING: {}".format(warning)
|
|
if colored:
|
|
msg = colored(msg, "yellow")
|
|
print(msg)
|
|
print("")
|
|
|
|
|
|
def run(skip_warnings=False):
|
|
legislators = load_to_dict("../legislators-current.yaml")
|
|
legislators_offices = load_to_dict("../legislators-district-offices.yaml")
|
|
|
|
has_errors = False
|
|
|
|
for bioguide_id, legislator_offices in legislators_offices.items():
|
|
legislator = legislators.get(bioguide_id)
|
|
|
|
errors, warnings = check_legislator_offices(legislator_offices, legislator)
|
|
|
|
if skip_warnings:
|
|
warnings = []
|
|
|
|
if errors:
|
|
has_errors = True
|
|
|
|
print_issues(legislator or bioguide_id, errors, warnings)
|
|
|
|
for bioguide_id in set(legislators) - set(legislators_offices):
|
|
# Only report an error for a missing office if the
|
|
# legislator has been in office for at least 60 days.
|
|
start_date = legislators[bioguide_id]['terms'][-1]['start']
|
|
if datetime.date.today() - datetime.datetime.strptime(start_date, '%Y-%m-%d').date() >= datetime.timedelta(60):
|
|
has_errors = True
|
|
errors, warnings = ["No offices"], []
|
|
else:
|
|
errors, warnings = [], ["No offices"]
|
|
print_issues(legislators[bioguide_id], errors, warnings)
|
|
|
|
return has_errors
|
|
|
|
if __name__ == '__main__':
|
|
import argparse
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--skip-warnings", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
has_errors = run(skip_warnings=args.skip_warnings)
|
|
sys.exit(1 if has_errors else 0)
|