Update projects/grants_xml_pipeline.md
This commit is contained in:
@@ -9,74 +9,166 @@
|
||||
|
||||
---
|
||||
|
||||
### **Final Script** (`grants_daily_ingest.sh`)
|
||||
### **Final Script** (`grants_xml_pipeline.sh`)
|
||||
```bash
|
||||
#!/bin/bash
|
||||
# Grants.gov XML Pipeline - Master Class Edition (Corrected)
|
||||
set -euo pipefail
|
||||
shopt -s lastpipe
|
||||
|
||||
# --- Config ---
|
||||
DISCORD_WEBHOOK="https://discord.com/api/webhooks/..." # For alerts
|
||||
MONGO_URI="$GRANTS_MONGO_URI" # From env
|
||||
BASE_URL="https://prod-grants-gov-chatbot.s3.amazonaws.com/extracts"
|
||||
TODAY=$(date +%Y%m%d)
|
||||
COLLECTION="grants_$(date +%Y%m)"
|
||||
### --- Configuration ---
|
||||
readonly MONGO_URI="${GRANTS_MONGO_URI:?Required env var}"
|
||||
readonly DISCORD_WEBHOOK="${DISCORD_WEBHOOK:-}"
|
||||
readonly BASE_URL="https://prod-grants-gov-chatbot.s3.amazonaws.com/extracts"
|
||||
readonly TODAY=$(date -u +%Y%m%d)
|
||||
readonly COLLECTION="grants_$(date -u +%Y%m)"
|
||||
readonly LOCKFILE="/tmp/grants_ingest_$TODAY.lock"
|
||||
readonly BATCH_SIZE=1000
|
||||
readonly LOG_FILE="/var/log/grants_ingest_$TODAY.log"
|
||||
|
||||
# --- Discord Alert Function ---
|
||||
notify_discord() {
|
||||
local color=$1 message=$2
|
||||
curl -sfS -X POST "$DISCORD_WEBHOOK" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"embeds\":[{\"color\":$color,\"description\":\"$message\"}]}" || true
|
||||
### --- Initialization & Cleanup ---
|
||||
cleanup() {
|
||||
local exit_code=$?
|
||||
rm -f "$LOCKFILE"
|
||||
[ $exit_code -ne 0 ] && send_alert ERROR "Pipeline interrupted"
|
||||
exit $exit_code
|
||||
}
|
||||
trap cleanup EXIT INT TERM
|
||||
|
||||
# --- Pipeline ---
|
||||
{
|
||||
# Download and stream XML
|
||||
curl -LfsS "${BASE_URL}/GrantsDBExtract${TODAY}v2.zip" | \
|
||||
bsdtar -xOf - '*.xml' | \
|
||||
|
||||
# Transform with embedded BI metadata
|
||||
xq -c --arg today "$TODAY" '
|
||||
.Opportunities.Opportunity[] |
|
||||
._bi_metadata = {
|
||||
ingest_date: $today,
|
||||
daily_stats: {
|
||||
funding_types: (
|
||||
group_by(.FundingInstrumentType) |
|
||||
map({type: .[0].FundingInstrumentType, count: length})
|
||||
),
|
||||
categories: (
|
||||
group_by(.OpportunityCategory) |
|
||||
map({category: .[0].OpportunityCategory, count: length})
|
||||
)
|
||||
}
|
||||
}
|
||||
' | \
|
||||
|
||||
# Batch import to MongoDB
|
||||
mongosh "$MONGO_URI" --eval "
|
||||
const BATCH_SIZE = 1000;
|
||||
let batch = [];
|
||||
while (true) {
|
||||
const doc = JSON.parse(readline());
|
||||
batch.push(doc);
|
||||
if (batch.length >= BATCH_SIZE) {
|
||||
db.$COLLECTION.insertMany(batch);
|
||||
batch = [];
|
||||
}
|
||||
}
|
||||
"
|
||||
|
||||
} || {
|
||||
# On failure: Discord alert + exit
|
||||
notify_discord 16711680 "🚨 Grants ingest failed for $TODAY! $(date)"
|
||||
# Atomic execution lock
|
||||
if ! (set -o noclobber; echo $$ > "$LOCKFILE") 2>/dev/null; then
|
||||
echo "Error: Ingest already running for $TODAY (PID $(<"$LOCKFILE"))" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
### --- Health Checks ---
|
||||
validate_environment() {
|
||||
[[ "$TODAY" =~ ^[0-9]{8}$ ]] || {
|
||||
send_alert ERROR "Invalid date format: $TODAY"
|
||||
exit 1
|
||||
}
|
||||
|
||||
if ! mongosh "$MONGO_URI" --quiet --eval "db.adminCommand('ping')" >/dev/null; then
|
||||
send_alert ERROR "MongoDB connection failed"
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
# Success alert with stats
|
||||
DOC_COUNT=$(mongosh "$MONGO_URI" --quiet --eval "db.$COLLECTION.countDocuments({'_bi_metadata.ingest_date': '$TODAY'})")
|
||||
notify_discord 65280 "✅ Success! Ingested $DOC_COUNT grants for $TODAY"
|
||||
### --- Notification System ---
|
||||
send_alert() {
|
||||
local level="$1" message="$2"
|
||||
[ -z "$DISCORD_WEBHOOK" ] && return
|
||||
|
||||
local color=16711680 # Red
|
||||
case "$level" in
|
||||
SUCCESS) color=65280 ;; # Green
|
||||
WARNING) color=16776960 ;; # Yellow
|
||||
esac
|
||||
|
||||
curl -fsS -X POST "$DISCORD_WEBHOOK" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$(jq -n \
|
||||
--arg msg "[$(date -u +'%Y-%m-%d %H:%M:%S UTC')] $message" \
|
||||
--argjson col "$color" \
|
||||
'{embeds: [{color: $col, description: $msg}]}')" || true
|
||||
}
|
||||
|
||||
### --- Data Processing ---
|
||||
process_batch() {
|
||||
local batch_file="$1"
|
||||
mongosh "$MONGO_URI" --quiet --eval "
|
||||
const COLLECTION = '$COLLECTION';
|
||||
const batch = $(cat "$batch_file");
|
||||
const result = db[COLLECTION].insertMany(batch, {ordered: false});
|
||||
print(JSON.stringify({
|
||||
processed: result.insertedCount,
|
||||
failed: batch.length - result.insertedCount
|
||||
}));
|
||||
"
|
||||
}
|
||||
|
||||
### --- Main Pipeline ---
|
||||
main() {
|
||||
validate_environment
|
||||
|
||||
# Check for existing data
|
||||
local existing_count=$(mongosh "$MONGO_URI" --quiet --eval "
|
||||
db.${COLLECTION}.countDocuments({'_bi_metadata.ingest_date': '$TODAY'})
|
||||
")
|
||||
|
||||
if [ "$existing_count" -gt 0 ]; then
|
||||
send_alert WARNING "Skipping ingest: $existing_count records already exist"
|
||||
return 0
|
||||
fi
|
||||
|
||||
echo "=== Starting ingest for $TODAY ===" | tee "$LOG_FILE"
|
||||
local start_time=$(date +%s)
|
||||
local total_processed=0 total_failed=0
|
||||
local batch_count=0
|
||||
local batch_file=$(mktemp)
|
||||
|
||||
# FIXED: Use process substitution to preserve variable scope
|
||||
while IFS= read -r json; do
|
||||
echo "$json" >> "$batch_file"
|
||||
((batch_count++))
|
||||
|
||||
if [ $((batch_count % BATCH_SIZE)) -eq 0 ]; then
|
||||
local batch_stats=$(process_batch "$batch_file")
|
||||
local batch_processed=$(jq -r '.processed' <<< "$batch_stats")
|
||||
local batch_failed=$(jq -r '.failed' <<< "$batch_stats")
|
||||
|
||||
total_processed=$((total_processed + batch_processed))
|
||||
total_failed=$((total_failed + batch_failed))
|
||||
|
||||
> "$batch_file" # Reset batch
|
||||
echo "Processed $batch_count records ($total_processed success, $total_failed failed)" | tee -a "$LOG_FILE"
|
||||
fi
|
||||
done < <(
|
||||
curl -LfsS "$BASE_URL/GrantsDBExtract${TODAY}v2.zip" | \
|
||||
bsdtar -xOf - '*.xml' | \
|
||||
xq -c --arg today "$TODAY" '
|
||||
.Opportunities.Opportunity[] |
|
||||
try (
|
||||
._bi_metadata = {
|
||||
ingest_date: $today,
|
||||
quality_score: {
|
||||
completeness: ((.OpportunityTitle | length) / 255),
|
||||
has_funding: (.AwardCeiling != null),
|
||||
has_deadline: (.CloseDate != null)
|
||||
},
|
||||
stats: {
|
||||
funding_type: .FundingInstrumentType,
|
||||
category: .OpportunityCategory,
|
||||
award_range: {
|
||||
floor: (.AwardFloor | tonumber?),
|
||||
ceiling: (.AwardCeiling | tonumber?)
|
||||
}
|
||||
}
|
||||
}
|
||||
) catch empty'
|
||||
)
|
||||
|
||||
# Process final batch
|
||||
if [ -s "$batch_file" ]; then
|
||||
local batch_stats=$(process_batch "$batch_file")
|
||||
total_processed=$((total_processed + $(jq -r '.processed' <<< "$batch_stats")))
|
||||
total_failed=$((total_failed + $(jq -r '.failed' <<< "$batch_stats")))
|
||||
fi
|
||||
rm -f "$batch_file"
|
||||
|
||||
# Final report
|
||||
local duration=$(( $(date +%s) - start_time ))
|
||||
local msg="Ingested $total_processed records ($total_failed failed) in ${duration}s"
|
||||
|
||||
if [ "$total_failed" -eq 0 ]; then
|
||||
send_alert SUCCESS "$msg"
|
||||
else
|
||||
send_alert WARNING "$msg"
|
||||
fi
|
||||
}
|
||||
|
||||
### --- Execution ---
|
||||
main 2>&1 | tee -a "$LOG_FILE"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
Reference in New Issue
Block a user