License verification is a blocking requirement before any contractor touches a job. Done manually, a compliance team member visits a state board website, types in a license number, screenshots the result, and pastes it into a spreadsheet. At scale - say, onboarding 500 new contractors a month - that process breaks down fast. The ContractorVerify API replaces that manual workflow with a single HTTP call that returns structured, normalized data from every major state licensing board.

This tutorial walks through a complete Python integration: from your first authenticated request to a production-ready pipeline that reads a CSV, verifies in async batches, caches results in Redis, and writes everything to a SQLite database with a compliance summary report at the end.

Prerequisites

You need Python 3.9 or higher. The tutorial uses f-strings, | union type hints, and asyncio.run(), all of which require 3.9+. Install the required packages:

pip install requests aiohttp python-dotenv redis sqlalchemy

Create a .env file in your project root. Never hardcode credentials in source files:

# .env
CONTRACTORVERIFY_API_KEY=your_api_key_here
REDIS_URL=redis://localhost:6379/0
DATABASE_URL=sqlite:///contractor_verification.db

Load these at the top of every script:

from dotenv import load_dotenv
import os

load_dotenv()

API_KEY = os.environ["CONTRACTORVERIFY_API_KEY"]
BASE_URL = "https://api.contractorverify.io/v1"
Security note: Add .env to your .gitignore before your first commit. Leaked API keys are a common source of unexpected billing and data exposure. Use environment variables or a secrets manager (AWS Secrets Manager, HashiCorp Vault) in production deployments.

Single Contractor Lookup

The GET /verify endpoint accepts a state code and license number. This is the simplest integration - one call, one result.

import requests
import os
from dotenv import load_dotenv

load_dotenv()
API_KEY = os.environ["CONTRACTORVERIFY_API_KEY"]
BASE_URL = "https://api.contractorverify.io/v1"

def verify_contractor(state: str, license_number: str) -> dict:
    """
    Look up a single contractor license.
    Returns the parsed response dict on success.
    Raises ValueError on 404/422, RuntimeError on server errors.
    """
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Accept": "application/json",
    }
    params = {
        "state": state.upper(),
        "license_number": license_number.strip(),
    }

    response = requests.get(
        f"{BASE_URL}/verify",
        headers=headers,
        params=params,
        timeout=10,
    )

    if response.status_code == 404:
        raise ValueError(f"License {license_number} not found in {state}")
    if response.status_code == 422:
        detail = response.json().get("detail", "Validation error")
        raise ValueError(f"Invalid request: {detail}")
    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 60))
        raise RuntimeError(f"Rate limited. Retry after {retry_after}s")
    if not response.ok:
        raise RuntimeError(f"API error {response.status_code}: {response.text}")

    return response.json()


# Example usage
if __name__ == "__main__":
    result = verify_contractor("CA", "1052183")
    print(result)

Parsing the Response

A successful response contains all the fields you need to make a compliance decision. Here is what each field means and how to use it:

Field Type Description
license_numberstringNormalized license number as returned by the state board
license_typestringState-specific license type code (e.g., "B", "C-10", "RMO")
classificationstringHuman-readable trade description (e.g., "General Building", "Electrical")
statusstringOne of: ACTIVE, EXPIRED, SUSPENDED, REVOKED, CANCELLED, PENDING
issue_datestring (ISO 8601)Date the license was originally issued
expiration_datestring (ISO 8601)Date the license expires - key field for compliance checks
bond_amountinteger | nullBond amount in USD cents, null if not required or not on file
disciplinary_actionsarrayList of disciplinary action objects with date, type, and description
business_namestringDBA or legal entity name on file with the state board
addressobjectStreet, city, state, zip as reported to the board
insurance_requiredbooleanWhether the license type requires proof of insurance on file

Parse the response into a typed dataclass for downstream processing:

from dataclasses import dataclass, field
from datetime import date, datetime
from typing import Optional

@dataclass
class DisciplinaryAction:
    action_date: str
    action_type: str
    description: str

@dataclass
class ContractorRecord:
    license_number: str
    license_type: str
    classification: str
    status: str
    issue_date: Optional[str]
    expiration_date: Optional[str]
    bond_amount: Optional[int]
    disciplinary_actions: list[DisciplinaryAction]
    business_name: str
    address: dict
    insurance_required: bool

    @classmethod
    def from_api(cls, data: dict) -> "ContractorRecord":
        actions = [
            DisciplinaryAction(
                action_date=a.get("date", ""),
                action_type=a.get("type", ""),
                description=a.get("description", ""),
            )
            for a in data.get("disciplinary_actions", [])
        ]
        return cls(
            license_number=data["license_number"],
            license_type=data["license_type"],
            classification=data["classification"],
            status=data["status"],
            issue_date=data.get("issue_date"),
            expiration_date=data.get("expiration_date"),
            bond_amount=data.get("bond_amount"),
            disciplinary_actions=actions,
            business_name=data.get("business_name", ""),
            address=data.get("address", {}),
            insurance_required=data.get("insurance_required", False),
        )

    def is_compliant(self) -> bool:
        """Returns True only if license is ACTIVE and not expired."""
        if self.status != "ACTIVE":
            return False
        if self.expiration_date:
            exp = datetime.fromisoformat(self.expiration_date).date()
            if exp < date.today():
                return False
        return True

Error Handling

Three HTTP status codes require special handling in contractor verification workflows:

404 Not Found - the license number does not exist in the state database. This is not necessarily fraud - the contractor may have used a different entity name, the license may be under an RMO (Responsible Managing Officer) rather than the business, or they may have the wrong state. Log the miss and flag for manual follow-up rather than hard-blocking.

422 Unprocessable Entity - your request was malformed. Common causes: invalid state abbreviation (use two-letter USPS codes), license number containing characters the state doesn't use, or a missing required parameter. The response body will contain a detail field with specifics.

429 Too Many Requests - you have exceeded your plan's rate limit. The response includes a Retry-After header with seconds to wait. Always respect this header rather than polling - continued requests during a rate limit window can result in temporary IP blocks.

Batch Verification

For bulk onboarding - say, verifying a roster of 2,000 subcontractors before a large project - the POST /verify/batch endpoint accepts up to 100 license lookups per request. The following function reads a CSV, chunks it into batches of 100, and collects results:

import csv
import time
import requests
from pathlib import Path

BATCH_SIZE = 100
BATCH_DELAY_SECONDS = 1.5  # be polite between sequential batches

def load_contractors_csv(path: str) -> list[dict]:
    """
    Expects CSV with columns: state, license_number
    Optional columns: contractor_id, name
    """
    contractors = []
    with open(path, newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            contractors.append({
                "state": row["state"].strip().upper(),
                "license_number": row["license_number"].strip(),
                # Pass through any extra metadata for correlation
                "_meta": {k: v for k, v in row.items()
                           if k not in ("state", "license_number")},
            })
    return contractors


def chunk(lst: list, size: int):
    """Yield successive chunks of `size` from lst."""
    for i in range(0, len(lst), size):
        yield lst[i : i + size]


def batch_verify(contractors: list[dict]) -> list[dict]:
    """
    Send contractors to the batch endpoint in chunks of BATCH_SIZE.
    Returns a flat list of result dicts.
    """
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json",
        "Accept": "application/json",
    }
    results = []

    batches = list(chunk(contractors, BATCH_SIZE))
    print(f"Processing {len(contractors)} contractors in {len(batches)} batches...")

    for i, batch in enumerate(batches, start=1):
        print(f"  Batch {i}/{len(batches)} ({len(batch)} records)...")

        # Strip internal metadata before sending to API
        payload = [
            {"state": c["state"], "license_number": c["license_number"]}
            for c in batch
        ]

        response = requests.post(
            f"{BASE_URL}/verify/batch",
            headers=headers,
            json={"licenses": payload},
            timeout=30,
        )

        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 60))
            print(f"  Rate limited - waiting {retry_after}s...")
            time.sleep(retry_after)
            # Retry this batch once
            response = requests.post(
                f"{BASE_URL}/verify/batch",
                headers=headers,
                json={"licenses": payload},
                timeout=30,
            )

        if not response.ok:
            raise RuntimeError(
                f"Batch {i} failed with status {response.status_code}: {response.text}"
            )

        batch_results = response.json().get("results", [])

        # Re-attach metadata for downstream correlation
        for j, result in enumerate(batch_results):
            result["_meta"] = batch[j].get("_meta", {})

        results.extend(batch_results)

        if i < len(batches):
            time.sleep(BATCH_DELAY_SECONDS)

    return results

Async Processing with aiohttp

The synchronous batch loop above is fast enough for most use cases, but if you need to verify thousands of contractors and can tolerate concurrent I/O, aiohttp lets you fire multiple batch requests in flight simultaneously. The key constraint: respect the rate limit by capping concurrency with a semaphore.

import asyncio
import aiohttp

CONCURRENT_BATCHES = 5  # tune based on your plan's rate limit

async def async_batch_verify(contractors: list[dict]) -> list[dict]:
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json",
        "Accept": "application/json",
    }
    semaphore = asyncio.Semaphore(CONCURRENT_BATCHES)
    results = []

    async def verify_one_batch(session, batch_index, batch):
        async with semaphore:
            payload = [
                {"state": c["state"], "license_number": c["license_number"]}
                for c in batch
            ]
            async with session.post(
                f"{BASE_URL}/verify/batch",
                headers=headers,
                json={"licenses": payload},
                timeout=aiohttp.ClientTimeout(total=30),
            ) as resp:
                if resp.status == 429:
                    retry_after = int(resp.headers.get("Retry-After", 60))
                    await asyncio.sleep(retry_after)
                    # bubble up for retry - simplified; add retry loop in prod
                    raise RuntimeError(f"Rate limited on batch {batch_index}")
                resp.raise_for_status()
                data = await resp.json()
                batch_results = data.get("results", [])
                for j, result in enumerate(batch_results):
                    result["_meta"] = batch[j].get("_meta", {})
                return batch_results

    batches = list(chunk(contractors, BATCH_SIZE))

    async with aiohttp.ClientSession() as session:
        tasks = [
            verify_one_batch(session, i, batch)
            for i, batch in enumerate(batches, start=1)
        ]
        batch_results_list = await asyncio.gather(*tasks, return_exceptions=True)

    for item in batch_results_list:
        if isinstance(item, Exception):
            print(f"Batch error: {item}")
        else:
            results.extend(item)

    return results

Redis Caching Layer

License data does not change by the minute. An ACTIVE license that was current at 9am is almost certainly still current at 3pm. A 24-hour cache dramatically reduces API costs for workflows that re-check the same contractors across multiple runs - daily compliance sweeps, for example, or webhook-triggered re-checks that may fire multiple times for the same contractor.

import redis
import json
import hashlib

redis_client = redis.from_url(os.environ.get("REDIS_URL", "redis://localhost:6379/0"))
CACHE_TTL_SECONDS = 86400  # 24 hours

def cache_key(state: str, license_number: str) -> str:
    raw = f"{state.upper()}:{license_number.strip().upper()}"
    return f"cv:license:{hashlib.sha256(raw.encode()).hexdigest()[:16]}:{raw}"

def get_cached(state: str, license_number: str) -> dict | None:
    key = cache_key(state, license_number)
    cached = redis_client.get(key)
    if cached:
        return json.loads(cached)
    return None

def set_cached(state: str, license_number: str, data: dict) -> None:
    key = cache_key(state, license_number)
    redis_client.setex(key, CACHE_TTL_SECONDS, json.dumps(data))

def verify_with_cache(state: str, license_number: str) -> dict:
    cached = get_cached(state, license_number)
    if cached:
        cached["_from_cache"] = True
        return cached
    result = verify_contractor(state, license_number)
    set_cached(state, license_number, result)
    result["_from_cache"] = False
    return result
Cache invalidation strategy: For disciplinary action monitoring, you may want a shorter TTL (2-4 hours) or a forced cache bust when a contractor's record has any disciplinary_actions entries. Suspended licenses can become active again; the cache should not hide that transition from your compliance system.

Database Persistence with SQLAlchemy

Raw API results belong in a database, not just a Redis cache. The cache is a speed layer; the database is your audit trail. Regulators, insurers, and platform trust-and-safety teams may all ask to see verification records with timestamps.

from sqlalchemy import create_engine, Column, String, Boolean, Integer, Text, DateTime
from sqlalchemy.orm import declarative_base, sessionmaker
from datetime import datetime, timezone
import json

DATABASE_URL = os.environ.get("DATABASE_URL", "sqlite:///contractor_verification.db")
engine = create_engine(DATABASE_URL, echo=False)
Session = sessionmaker(bind=engine)
Base = declarative_base()

class VerificationRecord(Base):
    __tablename__ = "verifications"

    id = Column(Integer, primary_key=True, autoincrement=True)
    verified_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
    state = Column(String(2), nullable=False, index=True)
    license_number = Column(String(64), nullable=False, index=True)
    business_name = Column(String(256))
    license_type = Column(String(64))
    classification = Column(String(256))
    status = Column(String(32), nullable=False)
    issue_date = Column(String(32))
    expiration_date = Column(String(32))
    bond_amount = Column(Integer)
    insurance_required = Column(Boolean)
    disciplinary_count = Column(Integer, default=0)
    raw_response = Column(Text)  # full JSON for audit
    from_cache = Column(Boolean, default=False)

    # Pass-through columns from input CSV
    contractor_id = Column(String(128), index=True)


Base.metadata.create_all(engine)


def save_verification(
    state: str,
    license_number: str,
    api_result: dict,
    contractor_id: str | None = None,
) -> VerificationRecord:
    session = Session()
    try:
        record = VerificationRecord(
            state=state,
            license_number=license_number,
            business_name=api_result.get("business_name", ""),
            license_type=api_result.get("license_type", ""),
            classification=api_result.get("classification", ""),
            status=api_result.get("status", "UNKNOWN"),
            issue_date=api_result.get("issue_date"),
            expiration_date=api_result.get("expiration_date"),
            bond_amount=api_result.get("bond_amount"),
            insurance_required=api_result.get("insurance_required", False),
            disciplinary_count=len(api_result.get("disciplinary_actions", [])),
            raw_response=json.dumps(api_result),
            from_cache=api_result.get("_from_cache", False),
            contractor_id=contractor_id,
        )
        session.add(record)
        session.commit()
        return record
    finally:
        session.close()

Retry Logic with Exponential Backoff

Rate limit responses (429) should trigger an exponential backoff retry, not an immediate failure. The following decorator works with any function that raises RuntimeError containing "Rate limited":

import time
import functools
import random

def with_retry(max_attempts: int = 4, base_delay: float = 2.0):
    """
    Decorator for exponential backoff on rate limit errors.
    Waits: 2s, 4s, 8s (+ jitter) before giving up.
    """
    def decorator(fn):
        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            for attempt in range(1, max_attempts + 1):
                try:
                    return fn(*args, **kwargs)
                except RuntimeError as e:
                    if "Rate limited" not in str(e) or attempt == max_attempts:
                        raise
                    wait = base_delay ** attempt + random.uniform(0, 1)
                    print(f"  Attempt {attempt} rate limited, retrying in {wait:.1f}s...")
                    time.sleep(wait)
            raise RuntimeError("Max retry attempts exceeded")
        return wrapper
    return decorator

@with_retry(max_attempts=4)
def verify_contractor_with_retry(state: str, license_number: str) -> dict:
    return verify_contractor(state, license_number)

Full Pipeline Script

This is the complete, production-ready script. It reads a CSV, verifies in async batches with caching, writes every result to SQLite, and generates a compliance report:

#!/usr/bin/env python3
"""
ContractorVerify full pipeline.

Usage:
    python verify_pipeline.py --input contractors.csv --output report.csv

CSV input format: contractor_id,state,license_number,name
"""

import asyncio
import csv
import json
import os
import sys
from datetime import date, datetime, timezone
from pathlib import Path

import aiohttp
import redis
from dotenv import load_dotenv
from sqlalchemy import create_engine, text

load_dotenv()
API_KEY = os.environ["CONTRACTORVERIFY_API_KEY"]
BASE_URL = "https://api.contractorverify.io/v1"
BATCH_SIZE = 100
CONCURRENT_BATCHES = 3
CACHE_TTL = 86400

redis_client = redis.from_url(os.environ.get("REDIS_URL", "redis://localhost:6379/0"))
engine = create_engine(
    os.environ.get("DATABASE_URL", "sqlite:///contractor_verification.db")
)

def is_compliant(result: dict) -> bool:
    if result.get("status") != "ACTIVE":
        return False
    exp = result.get("expiration_date")
    if exp:
        try:
            if datetime.fromisoformat(exp).date() < date.today():
                return False
        except ValueError:
            pass
    return True

async def run_pipeline(input_path: str, output_path: str):
    contractors = []
    with open(input_path, newline="", encoding="utf-8") as f:
        for row in csv.DictReader(f):
            contractors.append(row)

    print(f"Loaded {len(contractors)} contractors from {input_path}")

    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json",
    }
    semaphore = asyncio.Semaphore(CONCURRENT_BATCHES)
    all_results = []

    def get_cache(state, lic):
        key = f"cv:{state.upper()}:{lic.strip().upper()}"
        v = redis_client.get(key)
        return json.loads(v) if v else None

    def set_cache(state, lic, data):
        key = f"cv:{state.upper()}:{lic.strip().upper()}"
        redis_client.setex(key, CACHE_TTL, json.dumps(data))

    uncached_batches = []
    cached_results = []

    current_batch = []
    for c in contractors:
        cached = get_cache(c["state"], c["license_number"])
        if cached:
            cached["_from_cache"] = True
            cached["_contractor_id"] = c.get("contractor_id", "")
            cached_results.append(cached)
        else:
            current_batch.append(c)
            if len(current_batch) == BATCH_SIZE:
                uncached_batches.append(current_batch)
                current_batch = []
    if current_batch:
        uncached_batches.append(current_batch)

    print(f"Cache hits: {len(cached_results)}, API batches needed: {len(uncached_batches)}")

    async def fetch_batch(session, batch):
        async with semaphore:
            payload = [
                {"state": c["state"], "license_number": c["license_number"]}
                for c in batch
            ]
            async with session.post(
                f"{BASE_URL}/verify/batch",
                headers=headers,
                json={"licenses": payload},
                timeout=aiohttp.ClientTimeout(total=30),
            ) as resp:
                if resp.status == 429:
                    retry_after = int(resp.headers.get("Retry-After", 60))
                    await asyncio.sleep(retry_after)
                    resp.raise_for_status()
                resp.raise_for_status()
                data = await resp.json()
                results = data.get("results", [])
                for j, r in enumerate(results):
                    r["_from_cache"] = False
                    r["_contractor_id"] = batch[j].get("contractor_id", "")
                    set_cache(batch[j]["state"], batch[j]["license_number"], r)
                return results

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_batch(session, b) for b in uncached_batches]
        fetched = await asyncio.gather(*tasks, return_exceptions=True)

    api_results = []
    for item in fetched:
        if isinstance(item, Exception):
            print(f"Batch error: {item}", file=sys.stderr)
        else:
            api_results.extend(item)

    all_results = cached_results + api_results

    # Write to DB and generate report
    with engine.connect() as conn:
        conn.execute(text("""
            CREATE TABLE IF NOT EXISTS verifications (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                verified_at TEXT,
                contractor_id TEXT,
                state TEXT,
                license_number TEXT,
                business_name TEXT,
                status TEXT,
                expiration_date TEXT,
                compliant INTEGER,
                disciplinary_count INTEGER,
                from_cache INTEGER,
                raw_response TEXT
            )
        """))
        conn.commit()

        for r in all_results:
            conn.execute(text("""
                INSERT INTO verifications
                (verified_at, contractor_id, state, license_number, business_name,
                 status, expiration_date, compliant, disciplinary_count, from_cache, raw_response)
                VALUES (:ts, :cid, :state, :lic, :biz, :status, :exp, :ok, :disc, :cache, :raw)
            """), {
                "ts": datetime.now(timezone.utc).isoformat(),
                "cid": r.get("_contractor_id", ""),
                "state": r.get("state", ""),
                "lic": r.get("license_number", ""),
                "biz": r.get("business_name", ""),
                "status": r.get("status", "UNKNOWN"),
                "exp": r.get("expiration_date"),
                "ok": 1 if is_compliant(r) else 0,
                "disc": len(r.get("disciplinary_actions", [])),
                "cache": 1 if r.get("_from_cache") else 0,
                "raw": json.dumps(r),
            })
        conn.commit()

    with open(output_path, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=[
            "contractor_id", "state", "license_number", "business_name",
            "status", "expiration_date", "compliant", "disciplinary_count", "from_cache",
        ])
        writer.writeheader()
        for r in all_results:
            writer.writerow({
                "contractor_id": r.get("_contractor_id", ""),
                "state": r.get("state", ""),
                "license_number": r.get("license_number", ""),
                "business_name": r.get("business_name", ""),
                "status": r.get("status", "UNKNOWN"),
                "expiration_date": r.get("expiration_date", ""),
                "compliant": "YES" if is_compliant(r) else "NO",
                "disciplinary_count": len(r.get("disciplinary_actions", [])),
                "from_cache": "YES" if r.get("_from_cache") else "NO",
            })

    compliant = sum(1 for r in all_results if is_compliant(r))
    print(f"\n--- Compliance Report ---")
    print(f"Total verified: {len(all_results)}")
    print(f"Compliant (ACTIVE, not expired): {compliant}")
    print(f"Non-compliant: {len(all_results) - compliant}")
    print(f"Report written to: {output_path}")

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", required=True)
    parser.add_argument("--output", default="compliance_report.csv")
    args = parser.parse_args()
    asyncio.run(run_pipeline(args.input, args.output))

Running the Pipeline

Prepare your input CSV with a header row: contractor_id,state,license_number,name. The contractor_id column is your internal identifier - it is passed through to the report verbatim so you can join the compliance output back to your own database without relying on license numbers as a foreign key.

Run the pipeline:

python verify_pipeline.py --input contractors.csv --output compliance_report.csv

On subsequent runs against the same contractor list within 24 hours, the Redis cache layer will serve the majority of lookups without hitting the API. This makes daily compliance sweeps cheap - the first run pays for the API calls, subsequent same-day runs are nearly free.

For scheduled compliance sweeps, add this to cron or a task scheduler, then pipe the output report to your notification system. Any contractor flipping from compliant=YES to compliant=NO between runs is an alert that warrants immediate action - license expired, suspended, or revoked.

For deeper guidance on running large batch jobs and interpreting freshness windows in the API response, see How to Run Batch Contractor License Checks via the API and Contractor License Data Freshness - Real-Time vs. Cached.

> Automate Your License Verification_

The ContractorVerify API handles the state-by-state normalization, credential lookups, and data freshness so your team doesn't have to. Drop a CSV in, get a compliance report out. Join the waitlist to get API access when we open the beta.

Join the Waitlist