diff --git a/projects/grants-xml-sync.sh b/projects/grants-xml-sync.sh index 4ce0c14..5656e79 100644 --- a/projects/grants-xml-sync.sh +++ b/projects/grants-xml-sync.sh @@ -1,131 +1,158 @@ #!/bin/bash -# grants_xml_pipeline - Enterprise-grade Grants.gov data ingestion system -# Version: 1.1.0 -# -# Features: -# - Zero-tempfile streaming architecture -# - Cryptographic data provenance -# - Schema version awareness -# - Automated quality thresholds -# - Robust error handling and logging -# -# Usage: -# GRANTS_MONGO_URI="mongodb://cluster.example.com" ./grants_xml_pipeline [-v] -# -# Dependencies: -# - libarchive-tools (bsdtar) -# - yq (xq) for XML→JSON conversion -# - MongoDB shell tools - -set -eo pipefail +# Grants.gov XML Pipeline +set -euo pipefail shopt -s lastpipe -trap 'handle_error $LINENO' ERR -## === Configuration === -readonly VERSION="1.1.0" -readonly SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -readonly MONGO_URI="${GRANTS_MONGO_URI:?Error: GRANTS_MONGO_URI environment variable required}" -readonly COLLECTION_PREFIX="opportunities" +### --- Configuration --- +readonly MONGO_URI="${GRANTS_MONGO_URI:?Required env var}" +readonly DISCORD_WEBHOOK="${DISCORD_WEBHOOK:-}" readonly BASE_URL="https://prod-grants-gov-chatbot.s3.amazonaws.com/extracts" -readonly QUALITY_THRESHOLD_PCT="${GRANTS_QUALITY_THRESHOLD:-80}" -readonly SCHEMA_VALIDATION_ENABLED="${GRANTS_SCHEMA_VALIDATION:-true}" -readonly BULK_BATCH_SIZE=1000 -VERBOSE=false +readonly TODAY=$(date -u +%Y%m%d) +readonly COLLECTION="grants_$(date -u +%Y%m)" +readonly LOCKFILE="/tmp/grants_ingest_$TODAY.lock" +readonly BATCH_SIZE=1000 +readonly LOG_FILE="/var/log/grants_ingest_$TODAY.log" -## === Initialization Checks === +### --- Initialization & Cleanup --- +cleanup() { + local exit_code=$? + rm -f "$LOCKFILE" + [ $exit_code -ne 0 ] && send_alert ERROR "Pipeline interrupted" + exit $exit_code +} +trap cleanup EXIT INT TERM + +# Atomic execution lock +if ! (set -o noclobber; echo $$ > "$LOCKFILE") 2>/dev/null; then + echo "Error: Ingest already running for $TODAY (PID $(<"$LOCKFILE"))" >&2 + exit 1 +fi + +### --- Health Checks --- validate_environment() { - # Verify all dependencies exist - local missing=() - for cmd in bsdtar xq mongosh curl xmllint; do - if ! command -v "$cmd" >/dev/null; then - missing+=("$cmd") - fi - done - - if (( ${#missing[@]} > 0 )); then - log_error "Missing dependencies: ${missing[*]}" - exit 1 - fi - - # Verify MongoDB connection - if ! mongosh "$MONGO_URI" --eval "db.version()" >/dev/null; then - log_error "Cannot connect to MongoDB at $MONGO_URI" - exit 1 - fi -} - -## === Logging Utilities === -log() { - local level="$1" - local message="$2" - echo "[$(date '+%Y-%m-%d %H:%M:%S')] [${level}] ${message}" >&2 -} - -log_info() { - if $VERBOSE || [[ "$1" != "DEBUG" ]]; then - log "$@" - fi -} - -log_error() { - log "ERROR" "$1" + [[ "$TODAY" =~ ^[0-9]{8}$ ]] || { + send_alert ERROR "Invalid date format: $TODAY" exit 1 + } + + if ! mongosh "$MONGO_URI" --quiet --eval "db.adminCommand('ping')" >/dev/null; then + send_alert ERROR "MongoDB connection failed" + exit 1 + fi } -handle_error() { - local line="$1" - log_error "Error occurred at line $line" +### --- Notification System --- +send_alert() { + local level="$1" message="$2" + [ -z "$DISCORD_WEBHOOK" ] && return + + local color=16711680 # Red + case "$level" in + SUCCESS) color=65280 ;; # Green + WARNING) color=16776960 ;; # Yellow + esac + + curl -fsS -X POST "$DISCORD_WEBHOOK" \ + -H "Content-Type: application/json" \ + -d "$(jq -n \ + --arg msg "[$(date -u +'%Y-%m-%d %H:%M:%S UTC')] $message" \ + --argjson col "$color" \ + '{embeds: [{color: $col, description: $msg}]}')" || true } -## === Data Transformation === -transform_to_json() { - local schema_version="$1" - xq -c --arg schema_version "$schema_version" ' - .Opportunities.Opportunity[] | - ._metadata = { - ingestion_date: now|todateiso8601, - schema_version: $schema_version, - content_sha256: env.CONTENT_SHA256, - pipeline_version: env.VERSION - }' +### --- Data Processing --- +process_batch() { + local batch_file="$1" + mongosh "$MONGO_URI" --quiet --eval " + const COLLECTION = '$COLLECTION'; + const batch = $(cat "$batch_file"); + const result = db[COLLECTION].insertMany(batch, {ordered: false}); + print(JSON.stringify({ + processed: result.insertedCount, + failed: batch.length - result.insertedCount + })); + " } -## === Main Pipeline === -execute_data_pipeline() { - local current_date=$(date +%Y%m%d) - local collection_name="${COLLECTION_PREFIX}_$(date +%Y%m)" - local schema_version=$(get_schema_version) - - log_info "INFO" "Starting ingestion for ${current_date} (schema ${schema_version})" - - # Establish quality baseline - local baseline_count=$(get_previous_document_count "$collection_name") - log_info "INFO" "Baseline document count: ${baseline_count}" - - # Execute streaming pipeline - curl -LfsS "${BASE_URL}/GrantsDBExtract${current_date}v2.zip" | - tee >(compute_content_hash) | - bsdtar -xOf - '*.xml' | - validate_xml_stream | - transform_to_json "$schema_version" | - import_to_mongodb "$collection_name" "$baseline_count" - - log_info "INFO" "Pipeline completed successfully" -} - -## === Entry Point === +### --- Main Pipeline --- main() { - # Parse command-line arguments - while getopts ":v" opt; do - case $opt in - v) VERBOSE=true;; - \?) log_error "Invalid option: -$OPTARG";; - esac - done + validate_environment - validate_environment - execute_data_pipeline - exit 0 + # Check for existing data + local existing_count=$(mongosh "$MONGO_URI" --quiet --eval " + db.${COLLECTION}.countDocuments({'_bi_metadata.ingest_date': '$TODAY'}) + ") + + if [ "$existing_count" -gt 0 ]; then + send_alert WARNING "Skipping ingest: $existing_count records already exist" + return 0 + fi + + echo "=== Starting ingest for $TODAY ===" | tee "$LOG_FILE" + local start_time=$(date +%s) + local total_processed=0 total_failed=0 + local batch_count=0 + local batch_file=$(mktemp) + + # FIXED: Use process substitution to preserve variable scope + while IFS= read -r json; do + echo "$json" >> "$batch_file" + ((batch_count++)) + + if [ $((batch_count % BATCH_SIZE)) -eq 0 ]; then + local batch_stats=$(process_batch "$batch_file") + local batch_processed=$(jq -r '.processed' <<< "$batch_stats") + local batch_failed=$(jq -r '.failed' <<< "$batch_stats") + + total_processed=$((total_processed + batch_processed)) + total_failed=$((total_failed + batch_failed)) + + > "$batch_file" # Reset batch + echo "Processed $batch_count records ($total_processed success, $total_failed failed)" | tee -a "$LOG_FILE" + fi + done < <( + curl -LfsS "$BASE_URL/GrantsDBExtract${TODAY}v2.zip" | \ + bsdtar -xOf - '*.xml' | \ + xq -c --arg today "$TODAY" ' + .Opportunities.Opportunity[] | + try ( + ._bi_metadata = { + ingest_date: $today, + quality_score: { + completeness: ((.OpportunityTitle | length) / 255), + has_funding: (.AwardCeiling != null), + has_deadline: (.CloseDate != null) + }, + stats: { + funding_type: .FundingInstrumentType, + category: .OpportunityCategory, + award_range: { + floor: (.AwardFloor | tonumber?), + ceiling: (.AwardCeiling | tonumber?) + } + } + } + ) catch empty' + ) + + # Process final batch + if [ -s "$batch_file" ]; then + local batch_stats=$(process_batch "$batch_file") + total_processed=$((total_processed + $(jq -r '.processed' <<< "$batch_stats"))) + total_failed=$((total_failed + $(jq -r '.failed' <<< "$batch_stats"))) + fi + rm -f "$batch_file" + + # Final report + local duration=$(( $(date +%s) - start_time )) + local msg="Ingested $total_processed records ($total_failed failed) in ${duration}s" + + if [ "$total_failed" -eq 0 ]; then + send_alert SUCCESS "$msg" + else + send_alert WARNING "$msg" + fi } -main "$@" \ No newline at end of file +### --- Execution --- +main 2>&1 | tee -a "$LOG_FILE" \ No newline at end of file