License verification is a blocking requirement before any contractor touches a job. Done manually, a compliance team member visits a state board website, types in a license number, screenshots the result, and pastes it into a spreadsheet. At scale - say, onboarding 500 new contractors a month - that process breaks down fast. The ContractorVerify API replaces that manual workflow with a single HTTP call that returns structured, normalized data from every major state licensing board.
This tutorial walks through a complete Python integration: from your first authenticated request to a production-ready pipeline that reads a CSV, verifies in async batches, caches results in Redis, and writes everything to a SQLite database with a compliance summary report at the end.
Prerequisites
You need Python 3.9 or higher. The tutorial uses f-strings, | union type hints, and asyncio.run(), all of which require 3.9+. Install the required packages:
pip install requests aiohttp python-dotenv redis sqlalchemy
Create a .env file in your project root. Never hardcode credentials in source files:
# .env
CONTRACTORVERIFY_API_KEY=your_api_key_here
REDIS_URL=redis://localhost:6379/0
DATABASE_URL=sqlite:///contractor_verification.db
Load these at the top of every script:
from dotenv import load_dotenv
import os
load_dotenv()
API_KEY = os.environ["CONTRACTORVERIFY_API_KEY"]
BASE_URL = "https://api.contractorverify.io/v1"
.env to your .gitignore before your first commit. Leaked API keys are a common source of unexpected billing and data exposure. Use environment variables or a secrets manager (AWS Secrets Manager, HashiCorp Vault) in production deployments.
Single Contractor Lookup
The GET /verify endpoint accepts a state code and license number. This is the simplest integration - one call, one result.
import requests
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.environ["CONTRACTORVERIFY_API_KEY"]
BASE_URL = "https://api.contractorverify.io/v1"
def verify_contractor(state: str, license_number: str) -> dict:
"""
Look up a single contractor license.
Returns the parsed response dict on success.
Raises ValueError on 404/422, RuntimeError on server errors.
"""
headers = {
"Authorization": f"Bearer {API_KEY}",
"Accept": "application/json",
}
params = {
"state": state.upper(),
"license_number": license_number.strip(),
}
response = requests.get(
f"{BASE_URL}/verify",
headers=headers,
params=params,
timeout=10,
)
if response.status_code == 404:
raise ValueError(f"License {license_number} not found in {state}")
if response.status_code == 422:
detail = response.json().get("detail", "Validation error")
raise ValueError(f"Invalid request: {detail}")
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
raise RuntimeError(f"Rate limited. Retry after {retry_after}s")
if not response.ok:
raise RuntimeError(f"API error {response.status_code}: {response.text}")
return response.json()
# Example usage
if __name__ == "__main__":
result = verify_contractor("CA", "1052183")
print(result)
Parsing the Response
A successful response contains all the fields you need to make a compliance decision. Here is what each field means and how to use it:
| Field | Type | Description |
|---|---|---|
license_number | string | Normalized license number as returned by the state board |
license_type | string | State-specific license type code (e.g., "B", "C-10", "RMO") |
classification | string | Human-readable trade description (e.g., "General Building", "Electrical") |
status | string | One of: ACTIVE, EXPIRED, SUSPENDED, REVOKED, CANCELLED, PENDING |
issue_date | string (ISO 8601) | Date the license was originally issued |
expiration_date | string (ISO 8601) | Date the license expires - key field for compliance checks |
bond_amount | integer | null | Bond amount in USD cents, null if not required or not on file |
disciplinary_actions | array | List of disciplinary action objects with date, type, and description |
business_name | string | DBA or legal entity name on file with the state board |
address | object | Street, city, state, zip as reported to the board |
insurance_required | boolean | Whether the license type requires proof of insurance on file |
Parse the response into a typed dataclass for downstream processing:
from dataclasses import dataclass, field
from datetime import date, datetime
from typing import Optional
@dataclass
class DisciplinaryAction:
action_date: str
action_type: str
description: str
@dataclass
class ContractorRecord:
license_number: str
license_type: str
classification: str
status: str
issue_date: Optional[str]
expiration_date: Optional[str]
bond_amount: Optional[int]
disciplinary_actions: list[DisciplinaryAction]
business_name: str
address: dict
insurance_required: bool
@classmethod
def from_api(cls, data: dict) -> "ContractorRecord":
actions = [
DisciplinaryAction(
action_date=a.get("date", ""),
action_type=a.get("type", ""),
description=a.get("description", ""),
)
for a in data.get("disciplinary_actions", [])
]
return cls(
license_number=data["license_number"],
license_type=data["license_type"],
classification=data["classification"],
status=data["status"],
issue_date=data.get("issue_date"),
expiration_date=data.get("expiration_date"),
bond_amount=data.get("bond_amount"),
disciplinary_actions=actions,
business_name=data.get("business_name", ""),
address=data.get("address", {}),
insurance_required=data.get("insurance_required", False),
)
def is_compliant(self) -> bool:
"""Returns True only if license is ACTIVE and not expired."""
if self.status != "ACTIVE":
return False
if self.expiration_date:
exp = datetime.fromisoformat(self.expiration_date).date()
if exp < date.today():
return False
return True
Error Handling
Three HTTP status codes require special handling in contractor verification workflows:
404 Not Found - the license number does not exist in the state database. This is not necessarily fraud - the contractor may have used a different entity name, the license may be under an RMO (Responsible Managing Officer) rather than the business, or they may have the wrong state. Log the miss and flag for manual follow-up rather than hard-blocking.
422 Unprocessable Entity - your request was malformed. Common causes: invalid state abbreviation (use two-letter USPS codes), license number containing characters the state doesn't use, or a missing required parameter. The response body will contain a detail field with specifics.
429 Too Many Requests - you have exceeded your plan's rate limit. The response includes a Retry-After header with seconds to wait. Always respect this header rather than polling - continued requests during a rate limit window can result in temporary IP blocks.
Batch Verification
For bulk onboarding - say, verifying a roster of 2,000 subcontractors before a large project - the POST /verify/batch endpoint accepts up to 100 license lookups per request. The following function reads a CSV, chunks it into batches of 100, and collects results:
import csv
import time
import requests
from pathlib import Path
BATCH_SIZE = 100
BATCH_DELAY_SECONDS = 1.5 # be polite between sequential batches
def load_contractors_csv(path: str) -> list[dict]:
"""
Expects CSV with columns: state, license_number
Optional columns: contractor_id, name
"""
contractors = []
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
contractors.append({
"state": row["state"].strip().upper(),
"license_number": row["license_number"].strip(),
# Pass through any extra metadata for correlation
"_meta": {k: v for k, v in row.items()
if k not in ("state", "license_number")},
})
return contractors
def chunk(lst: list, size: int):
"""Yield successive chunks of `size` from lst."""
for i in range(0, len(lst), size):
yield lst[i : i + size]
def batch_verify(contractors: list[dict]) -> list[dict]:
"""
Send contractors to the batch endpoint in chunks of BATCH_SIZE.
Returns a flat list of result dicts.
"""
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"Accept": "application/json",
}
results = []
batches = list(chunk(contractors, BATCH_SIZE))
print(f"Processing {len(contractors)} contractors in {len(batches)} batches...")
for i, batch in enumerate(batches, start=1):
print(f" Batch {i}/{len(batches)} ({len(batch)} records)...")
# Strip internal metadata before sending to API
payload = [
{"state": c["state"], "license_number": c["license_number"]}
for c in batch
]
response = requests.post(
f"{BASE_URL}/verify/batch",
headers=headers,
json={"licenses": payload},
timeout=30,
)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
print(f" Rate limited - waiting {retry_after}s...")
time.sleep(retry_after)
# Retry this batch once
response = requests.post(
f"{BASE_URL}/verify/batch",
headers=headers,
json={"licenses": payload},
timeout=30,
)
if not response.ok:
raise RuntimeError(
f"Batch {i} failed with status {response.status_code}: {response.text}"
)
batch_results = response.json().get("results", [])
# Re-attach metadata for downstream correlation
for j, result in enumerate(batch_results):
result["_meta"] = batch[j].get("_meta", {})
results.extend(batch_results)
if i < len(batches):
time.sleep(BATCH_DELAY_SECONDS)
return results
Async Processing with aiohttp
The synchronous batch loop above is fast enough for most use cases, but if you need to verify thousands of contractors and can tolerate concurrent I/O, aiohttp lets you fire multiple batch requests in flight simultaneously. The key constraint: respect the rate limit by capping concurrency with a semaphore.
import asyncio
import aiohttp
CONCURRENT_BATCHES = 5 # tune based on your plan's rate limit
async def async_batch_verify(contractors: list[dict]) -> list[dict]:
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"Accept": "application/json",
}
semaphore = asyncio.Semaphore(CONCURRENT_BATCHES)
results = []
async def verify_one_batch(session, batch_index, batch):
async with semaphore:
payload = [
{"state": c["state"], "license_number": c["license_number"]}
for c in batch
]
async with session.post(
f"{BASE_URL}/verify/batch",
headers=headers,
json={"licenses": payload},
timeout=aiohttp.ClientTimeout(total=30),
) as resp:
if resp.status == 429:
retry_after = int(resp.headers.get("Retry-After", 60))
await asyncio.sleep(retry_after)
# bubble up for retry - simplified; add retry loop in prod
raise RuntimeError(f"Rate limited on batch {batch_index}")
resp.raise_for_status()
data = await resp.json()
batch_results = data.get("results", [])
for j, result in enumerate(batch_results):
result["_meta"] = batch[j].get("_meta", {})
return batch_results
batches = list(chunk(contractors, BATCH_SIZE))
async with aiohttp.ClientSession() as session:
tasks = [
verify_one_batch(session, i, batch)
for i, batch in enumerate(batches, start=1)
]
batch_results_list = await asyncio.gather(*tasks, return_exceptions=True)
for item in batch_results_list:
if isinstance(item, Exception):
print(f"Batch error: {item}")
else:
results.extend(item)
return results
Redis Caching Layer
License data does not change by the minute. An ACTIVE license that was current at 9am is almost certainly still current at 3pm. A 24-hour cache dramatically reduces API costs for workflows that re-check the same contractors across multiple runs - daily compliance sweeps, for example, or webhook-triggered re-checks that may fire multiple times for the same contractor.
import redis
import json
import hashlib
redis_client = redis.from_url(os.environ.get("REDIS_URL", "redis://localhost:6379/0"))
CACHE_TTL_SECONDS = 86400 # 24 hours
def cache_key(state: str, license_number: str) -> str:
raw = f"{state.upper()}:{license_number.strip().upper()}"
return f"cv:license:{hashlib.sha256(raw.encode()).hexdigest()[:16]}:{raw}"
def get_cached(state: str, license_number: str) -> dict | None:
key = cache_key(state, license_number)
cached = redis_client.get(key)
if cached:
return json.loads(cached)
return None
def set_cached(state: str, license_number: str, data: dict) -> None:
key = cache_key(state, license_number)
redis_client.setex(key, CACHE_TTL_SECONDS, json.dumps(data))
def verify_with_cache(state: str, license_number: str) -> dict:
cached = get_cached(state, license_number)
if cached:
cached["_from_cache"] = True
return cached
result = verify_contractor(state, license_number)
set_cached(state, license_number, result)
result["_from_cache"] = False
return result
disciplinary_actions entries. Suspended licenses can become active again; the cache should not hide that transition from your compliance system.
Database Persistence with SQLAlchemy
Raw API results belong in a database, not just a Redis cache. The cache is a speed layer; the database is your audit trail. Regulators, insurers, and platform trust-and-safety teams may all ask to see verification records with timestamps.
from sqlalchemy import create_engine, Column, String, Boolean, Integer, Text, DateTime
from sqlalchemy.orm import declarative_base, sessionmaker
from datetime import datetime, timezone
import json
DATABASE_URL = os.environ.get("DATABASE_URL", "sqlite:///contractor_verification.db")
engine = create_engine(DATABASE_URL, echo=False)
Session = sessionmaker(bind=engine)
Base = declarative_base()
class VerificationRecord(Base):
__tablename__ = "verifications"
id = Column(Integer, primary_key=True, autoincrement=True)
verified_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
state = Column(String(2), nullable=False, index=True)
license_number = Column(String(64), nullable=False, index=True)
business_name = Column(String(256))
license_type = Column(String(64))
classification = Column(String(256))
status = Column(String(32), nullable=False)
issue_date = Column(String(32))
expiration_date = Column(String(32))
bond_amount = Column(Integer)
insurance_required = Column(Boolean)
disciplinary_count = Column(Integer, default=0)
raw_response = Column(Text) # full JSON for audit
from_cache = Column(Boolean, default=False)
# Pass-through columns from input CSV
contractor_id = Column(String(128), index=True)
Base.metadata.create_all(engine)
def save_verification(
state: str,
license_number: str,
api_result: dict,
contractor_id: str | None = None,
) -> VerificationRecord:
session = Session()
try:
record = VerificationRecord(
state=state,
license_number=license_number,
business_name=api_result.get("business_name", ""),
license_type=api_result.get("license_type", ""),
classification=api_result.get("classification", ""),
status=api_result.get("status", "UNKNOWN"),
issue_date=api_result.get("issue_date"),
expiration_date=api_result.get("expiration_date"),
bond_amount=api_result.get("bond_amount"),
insurance_required=api_result.get("insurance_required", False),
disciplinary_count=len(api_result.get("disciplinary_actions", [])),
raw_response=json.dumps(api_result),
from_cache=api_result.get("_from_cache", False),
contractor_id=contractor_id,
)
session.add(record)
session.commit()
return record
finally:
session.close()
Retry Logic with Exponential Backoff
Rate limit responses (429) should trigger an exponential backoff retry, not an immediate failure. The following decorator works with any function that raises RuntimeError containing "Rate limited":
import time
import functools
import random
def with_retry(max_attempts: int = 4, base_delay: float = 2.0):
"""
Decorator for exponential backoff on rate limit errors.
Waits: 2s, 4s, 8s (+ jitter) before giving up.
"""
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return fn(*args, **kwargs)
except RuntimeError as e:
if "Rate limited" not in str(e) or attempt == max_attempts:
raise
wait = base_delay ** attempt + random.uniform(0, 1)
print(f" Attempt {attempt} rate limited, retrying in {wait:.1f}s...")
time.sleep(wait)
raise RuntimeError("Max retry attempts exceeded")
return wrapper
return decorator
@with_retry(max_attempts=4)
def verify_contractor_with_retry(state: str, license_number: str) -> dict:
return verify_contractor(state, license_number)
Full Pipeline Script
This is the complete, production-ready script. It reads a CSV, verifies in async batches with caching, writes every result to SQLite, and generates a compliance report:
#!/usr/bin/env python3
"""
ContractorVerify full pipeline.
Usage:
python verify_pipeline.py --input contractors.csv --output report.csv
CSV input format: contractor_id,state,license_number,name
"""
import asyncio
import csv
import json
import os
import sys
from datetime import date, datetime, timezone
from pathlib import Path
import aiohttp
import redis
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
load_dotenv()
API_KEY = os.environ["CONTRACTORVERIFY_API_KEY"]
BASE_URL = "https://api.contractorverify.io/v1"
BATCH_SIZE = 100
CONCURRENT_BATCHES = 3
CACHE_TTL = 86400
redis_client = redis.from_url(os.environ.get("REDIS_URL", "redis://localhost:6379/0"))
engine = create_engine(
os.environ.get("DATABASE_URL", "sqlite:///contractor_verification.db")
)
def is_compliant(result: dict) -> bool:
if result.get("status") != "ACTIVE":
return False
exp = result.get("expiration_date")
if exp:
try:
if datetime.fromisoformat(exp).date() < date.today():
return False
except ValueError:
pass
return True
async def run_pipeline(input_path: str, output_path: str):
contractors = []
with open(input_path, newline="", encoding="utf-8") as f:
for row in csv.DictReader(f):
contractors.append(row)
print(f"Loaded {len(contractors)} contractors from {input_path}")
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
semaphore = asyncio.Semaphore(CONCURRENT_BATCHES)
all_results = []
def get_cache(state, lic):
key = f"cv:{state.upper()}:{lic.strip().upper()}"
v = redis_client.get(key)
return json.loads(v) if v else None
def set_cache(state, lic, data):
key = f"cv:{state.upper()}:{lic.strip().upper()}"
redis_client.setex(key, CACHE_TTL, json.dumps(data))
uncached_batches = []
cached_results = []
current_batch = []
for c in contractors:
cached = get_cache(c["state"], c["license_number"])
if cached:
cached["_from_cache"] = True
cached["_contractor_id"] = c.get("contractor_id", "")
cached_results.append(cached)
else:
current_batch.append(c)
if len(current_batch) == BATCH_SIZE:
uncached_batches.append(current_batch)
current_batch = []
if current_batch:
uncached_batches.append(current_batch)
print(f"Cache hits: {len(cached_results)}, API batches needed: {len(uncached_batches)}")
async def fetch_batch(session, batch):
async with semaphore:
payload = [
{"state": c["state"], "license_number": c["license_number"]}
for c in batch
]
async with session.post(
f"{BASE_URL}/verify/batch",
headers=headers,
json={"licenses": payload},
timeout=aiohttp.ClientTimeout(total=30),
) as resp:
if resp.status == 429:
retry_after = int(resp.headers.get("Retry-After", 60))
await asyncio.sleep(retry_after)
resp.raise_for_status()
resp.raise_for_status()
data = await resp.json()
results = data.get("results", [])
for j, r in enumerate(results):
r["_from_cache"] = False
r["_contractor_id"] = batch[j].get("contractor_id", "")
set_cache(batch[j]["state"], batch[j]["license_number"], r)
return results
async with aiohttp.ClientSession() as session:
tasks = [fetch_batch(session, b) for b in uncached_batches]
fetched = await asyncio.gather(*tasks, return_exceptions=True)
api_results = []
for item in fetched:
if isinstance(item, Exception):
print(f"Batch error: {item}", file=sys.stderr)
else:
api_results.extend(item)
all_results = cached_results + api_results
# Write to DB and generate report
with engine.connect() as conn:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS verifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
verified_at TEXT,
contractor_id TEXT,
state TEXT,
license_number TEXT,
business_name TEXT,
status TEXT,
expiration_date TEXT,
compliant INTEGER,
disciplinary_count INTEGER,
from_cache INTEGER,
raw_response TEXT
)
"""))
conn.commit()
for r in all_results:
conn.execute(text("""
INSERT INTO verifications
(verified_at, contractor_id, state, license_number, business_name,
status, expiration_date, compliant, disciplinary_count, from_cache, raw_response)
VALUES (:ts, :cid, :state, :lic, :biz, :status, :exp, :ok, :disc, :cache, :raw)
"""), {
"ts": datetime.now(timezone.utc).isoformat(),
"cid": r.get("_contractor_id", ""),
"state": r.get("state", ""),
"lic": r.get("license_number", ""),
"biz": r.get("business_name", ""),
"status": r.get("status", "UNKNOWN"),
"exp": r.get("expiration_date"),
"ok": 1 if is_compliant(r) else 0,
"disc": len(r.get("disciplinary_actions", [])),
"cache": 1 if r.get("_from_cache") else 0,
"raw": json.dumps(r),
})
conn.commit()
with open(output_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=[
"contractor_id", "state", "license_number", "business_name",
"status", "expiration_date", "compliant", "disciplinary_count", "from_cache",
])
writer.writeheader()
for r in all_results:
writer.writerow({
"contractor_id": r.get("_contractor_id", ""),
"state": r.get("state", ""),
"license_number": r.get("license_number", ""),
"business_name": r.get("business_name", ""),
"status": r.get("status", "UNKNOWN"),
"expiration_date": r.get("expiration_date", ""),
"compliant": "YES" if is_compliant(r) else "NO",
"disciplinary_count": len(r.get("disciplinary_actions", [])),
"from_cache": "YES" if r.get("_from_cache") else "NO",
})
compliant = sum(1 for r in all_results if is_compliant(r))
print(f"\n--- Compliance Report ---")
print(f"Total verified: {len(all_results)}")
print(f"Compliant (ACTIVE, not expired): {compliant}")
print(f"Non-compliant: {len(all_results) - compliant}")
print(f"Report written to: {output_path}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True)
parser.add_argument("--output", default="compliance_report.csv")
args = parser.parse_args()
asyncio.run(run_pipeline(args.input, args.output))
Running the Pipeline
Prepare your input CSV with a header row: contractor_id,state,license_number,name. The contractor_id column is your internal identifier - it is passed through to the report verbatim so you can join the compliance output back to your own database without relying on license numbers as a foreign key.
Run the pipeline:
python verify_pipeline.py --input contractors.csv --output compliance_report.csv
On subsequent runs against the same contractor list within 24 hours, the Redis cache layer will serve the majority of lookups without hitting the API. This makes daily compliance sweeps cheap - the first run pays for the API calls, subsequent same-day runs are nearly free.
For scheduled compliance sweeps, add this to cron or a task scheduler, then pipe the output report to your notification system. Any contractor flipping from compliant=YES to compliant=NO between runs is an alert that warrants immediate action - license expired, suspended, or revoked.
For deeper guidance on running large batch jobs and interpreting freshness windows in the API response, see How to Run Batch Contractor License Checks via the API and Contractor License Data Freshness - Real-Time vs. Cached.
> Automate Your License Verification_
The ContractorVerify API handles the state-by-state normalization, credential lookups, and data freshness so your team doesn't have to. Drop a CSV in, get a compliance report out. Join the waitlist to get API access when we open the beta.
Join the Waitlist