Files
the_information_nexus/projects/grants-xml-sync.md

3.5 KiB

#!/bin/bash

grants_xml_pipeline - Enterprise-grade Grants.gov data ingestion system

Version: 1.1.0

Features:

- Zero-tempfile streaming architecture

- Cryptographic data provenance

- Schema version awareness

- Automated quality thresholds

- Robust error handling and logging

Usage:

GRANTS_MONGO_URI="mongodb://cluster.example.com" ./grants_xml_pipeline [-v]

Dependencies:

- libarchive-tools (bsdtar)

- yq (xq) for XML→JSON conversion

- MongoDB shell tools

set -eo pipefail shopt -s lastpipe trap 'handle_error $LINENO' ERR

=== Configuration ===

readonly VERSION="1.1.0" readonly SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" readonly MONGO_URI="${GRANTS_MONGO_URI:?Error: GRANTS_MONGO_URI environment variable required}" readonly COLLECTION_PREFIX="opportunities" readonly BASE_URL="https://prod-grants-gov-chatbot.s3.amazonaws.com/extracts" readonly QUALITY_THRESHOLD_PCT="${GRANTS_QUALITY_THRESHOLD:-80}" readonly SCHEMA_VALIDATION_ENABLED="${GRANTS_SCHEMA_VALIDATION:-true}" readonly BULK_BATCH_SIZE=1000 VERBOSE=false

=== Initialization Checks ===

validate_environment() { # Verify all dependencies exist local missing=() for cmd in bsdtar xq mongosh curl xmllint; do if ! command -v "$cmd" >/dev/null; then missing+=("$cmd") fi done

if (( ${#missing[@]} > 0 )); then
    log_error "Missing dependencies: ${missing[*]}"
    exit 1
fi

# Verify MongoDB connection
if ! mongosh "$MONGO_URI" --eval "db.version()" >/dev/null; then
    log_error "Cannot connect to MongoDB at $MONGO_URI"
    exit 1
fi

}

=== Logging Utilities ===

log() { local level="$1" local message="$2" echo "[$(date '+%Y-%m-%d %H:%M:%S')] [${level}] ${message}" >&2 }

log_info() { if $VERBOSE || ; then log "$@" fi }

log_error() { log "ERROR" "$1" exit 1 }

handle_error() { local line="$1" log_error "Error occurred at line $line" }

=== Data Transformation ===

transform_to_json() { local schema_version="$1" xq -c --arg schema_version "$schema_version" ' .Opportunities.Opportunity[] | ._metadata = { ingestion_date: now|todateiso8601, schema_version: $schema_version, content_sha256: env.CONTENT_SHA256, pipeline_version: env.VERSION }' }

=== Main Pipeline ===

execute_data_pipeline() { local current_date=$(date +%Y%m%d) local collection_name="${COLLECTION_PREFIX}_$(date +%Y%m)" local schema_version=$(get_schema_version)

log_info "INFO" "Starting ingestion for ${current_date} (schema ${schema_version})"

# Establish quality baseline
local baseline_count=$(get_previous_document_count "$collection_name")
log_info "INFO" "Baseline document count: ${baseline_count}"

# Execute streaming pipeline
curl -LfsS "${BASE_URL}/GrantsDBExtract${current_date}v2.zip" |
    tee >(compute_content_hash) |
    bsdtar -xOf - '*.xml' |
    validate_xml_stream |
    transform_to_json "$schema_version" |
    import_to_mongodb "$collection_name" "$baseline_count"

log_info "INFO" "Pipeline completed successfully"

}

=== Entry Point ===

main() { # Parse command-line arguments while getopts ":v" opt; do case $opt in v) VERBOSE=true;; ?) log_error "Invalid option: -$OPTARG";; esac done

validate_environment
execute_data_pipeline
exit 0

}

main "$@"