diff --git a/projects/grants_xml_pipeline.md b/projects/grants_xml_pipeline.md index 9bcc569..28fbfab 100644 --- a/projects/grants_xml_pipeline.md +++ b/projects/grants_xml_pipeline.md @@ -9,74 +9,166 @@ --- -### **Final Script** (`grants_daily_ingest.sh`) +### **Final Script** (`grants_xml_pipeline.sh`) ```bash #!/bin/bash +# Grants.gov XML Pipeline - Master Class Edition (Corrected) set -euo pipefail shopt -s lastpipe -# --- Config --- -DISCORD_WEBHOOK="https://discord.com/api/webhooks/..." # For alerts -MONGO_URI="$GRANTS_MONGO_URI" # From env -BASE_URL="https://prod-grants-gov-chatbot.s3.amazonaws.com/extracts" -TODAY=$(date +%Y%m%d) -COLLECTION="grants_$(date +%Y%m)" +### --- Configuration --- +readonly MONGO_URI="${GRANTS_MONGO_URI:?Required env var}" +readonly DISCORD_WEBHOOK="${DISCORD_WEBHOOK:-}" +readonly BASE_URL="https://prod-grants-gov-chatbot.s3.amazonaws.com/extracts" +readonly TODAY=$(date -u +%Y%m%d) +readonly COLLECTION="grants_$(date -u +%Y%m)" +readonly LOCKFILE="/tmp/grants_ingest_$TODAY.lock" +readonly BATCH_SIZE=1000 +readonly LOG_FILE="/var/log/grants_ingest_$TODAY.log" -# --- Discord Alert Function --- -notify_discord() { - local color=$1 message=$2 - curl -sfS -X POST "$DISCORD_WEBHOOK" \ - -H "Content-Type: application/json" \ - -d "{\"embeds\":[{\"color\":$color,\"description\":\"$message\"}]}" || true +### --- Initialization & Cleanup --- +cleanup() { + local exit_code=$? + rm -f "$LOCKFILE" + [ $exit_code -ne 0 ] && send_alert ERROR "Pipeline interrupted" + exit $exit_code } +trap cleanup EXIT INT TERM -# --- Pipeline --- -{ - # Download and stream XML - curl -LfsS "${BASE_URL}/GrantsDBExtract${TODAY}v2.zip" | \ - bsdtar -xOf - '*.xml' | \ - - # Transform with embedded BI metadata - xq -c --arg today "$TODAY" ' - .Opportunities.Opportunity[] | - ._bi_metadata = { - ingest_date: $today, - daily_stats: { - funding_types: ( - group_by(.FundingInstrumentType) | - map({type: .[0].FundingInstrumentType, count: length}) - ), - categories: ( - group_by(.OpportunityCategory) | - map({category: .[0].OpportunityCategory, count: length}) - ) - } - } - ' | \ - - # Batch import to MongoDB - mongosh "$MONGO_URI" --eval " - const BATCH_SIZE = 1000; - let batch = []; - while (true) { - const doc = JSON.parse(readline()); - batch.push(doc); - if (batch.length >= BATCH_SIZE) { - db.$COLLECTION.insertMany(batch); - batch = []; - } - } - " - -} || { - # On failure: Discord alert + exit - notify_discord 16711680 "🚨 Grants ingest failed for $TODAY! $(date)" +# Atomic execution lock +if ! (set -o noclobber; echo $$ > "$LOCKFILE") 2>/dev/null; then + echo "Error: Ingest already running for $TODAY (PID $(<"$LOCKFILE"))" >&2 exit 1 +fi + +### --- Health Checks --- +validate_environment() { + [[ "$TODAY" =~ ^[0-9]{8}$ ]] || { + send_alert ERROR "Invalid date format: $TODAY" + exit 1 + } + + if ! mongosh "$MONGO_URI" --quiet --eval "db.adminCommand('ping')" >/dev/null; then + send_alert ERROR "MongoDB connection failed" + exit 1 + fi } -# Success alert with stats -DOC_COUNT=$(mongosh "$MONGO_URI" --quiet --eval "db.$COLLECTION.countDocuments({'_bi_metadata.ingest_date': '$TODAY'})") -notify_discord 65280 "✅ Success! Ingested $DOC_COUNT grants for $TODAY" +### --- Notification System --- +send_alert() { + local level="$1" message="$2" + [ -z "$DISCORD_WEBHOOK" ] && return + + local color=16711680 # Red + case "$level" in + SUCCESS) color=65280 ;; # Green + WARNING) color=16776960 ;; # Yellow + esac + + curl -fsS -X POST "$DISCORD_WEBHOOK" \ + -H "Content-Type: application/json" \ + -d "$(jq -n \ + --arg msg "[$(date -u +'%Y-%m-%d %H:%M:%S UTC')] $message" \ + --argjson col "$color" \ + '{embeds: [{color: $col, description: $msg}]}')" || true +} + +### --- Data Processing --- +process_batch() { + local batch_file="$1" + mongosh "$MONGO_URI" --quiet --eval " + const COLLECTION = '$COLLECTION'; + const batch = $(cat "$batch_file"); + const result = db[COLLECTION].insertMany(batch, {ordered: false}); + print(JSON.stringify({ + processed: result.insertedCount, + failed: batch.length - result.insertedCount + })); + " +} + +### --- Main Pipeline --- +main() { + validate_environment + + # Check for existing data + local existing_count=$(mongosh "$MONGO_URI" --quiet --eval " + db.${COLLECTION}.countDocuments({'_bi_metadata.ingest_date': '$TODAY'}) + ") + + if [ "$existing_count" -gt 0 ]; then + send_alert WARNING "Skipping ingest: $existing_count records already exist" + return 0 + fi + + echo "=== Starting ingest for $TODAY ===" | tee "$LOG_FILE" + local start_time=$(date +%s) + local total_processed=0 total_failed=0 + local batch_count=0 + local batch_file=$(mktemp) + + # FIXED: Use process substitution to preserve variable scope + while IFS= read -r json; do + echo "$json" >> "$batch_file" + ((batch_count++)) + + if [ $((batch_count % BATCH_SIZE)) -eq 0 ]; then + local batch_stats=$(process_batch "$batch_file") + local batch_processed=$(jq -r '.processed' <<< "$batch_stats") + local batch_failed=$(jq -r '.failed' <<< "$batch_stats") + + total_processed=$((total_processed + batch_processed)) + total_failed=$((total_failed + batch_failed)) + + > "$batch_file" # Reset batch + echo "Processed $batch_count records ($total_processed success, $total_failed failed)" | tee -a "$LOG_FILE" + fi + done < <( + curl -LfsS "$BASE_URL/GrantsDBExtract${TODAY}v2.zip" | \ + bsdtar -xOf - '*.xml' | \ + xq -c --arg today "$TODAY" ' + .Opportunities.Opportunity[] | + try ( + ._bi_metadata = { + ingest_date: $today, + quality_score: { + completeness: ((.OpportunityTitle | length) / 255), + has_funding: (.AwardCeiling != null), + has_deadline: (.CloseDate != null) + }, + stats: { + funding_type: .FundingInstrumentType, + category: .OpportunityCategory, + award_range: { + floor: (.AwardFloor | tonumber?), + ceiling: (.AwardCeiling | tonumber?) + } + } + } + ) catch empty' + ) + + # Process final batch + if [ -s "$batch_file" ]; then + local batch_stats=$(process_batch "$batch_file") + total_processed=$((total_processed + $(jq -r '.processed' <<< "$batch_stats"))) + total_failed=$((total_failed + $(jq -r '.failed' <<< "$batch_stats"))) + fi + rm -f "$batch_file" + + # Final report + local duration=$(( $(date +%s) - start_time )) + local msg="Ingested $total_processed records ($total_failed failed) in ${duration}s" + + if [ "$total_failed" -eq 0 ]; then + send_alert SUCCESS "$msg" + else + send_alert WARNING "$msg" + fi +} + +### --- Execution --- +main 2>&1 | tee -a "$LOG_FILE" ``` ---