Update projects/grants-xml-sync.sh
This commit is contained in:
@@ -1,131 +1,158 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
# grants_xml_pipeline - Enterprise-grade Grants.gov data ingestion system
|
# Grants.gov XML Pipeline
|
||||||
# Version: 1.1.0
|
set -euo pipefail
|
||||||
#
|
|
||||||
# Features:
|
|
||||||
# - Zero-tempfile streaming architecture
|
|
||||||
# - Cryptographic data provenance
|
|
||||||
# - Schema version awareness
|
|
||||||
# - Automated quality thresholds
|
|
||||||
# - Robust error handling and logging
|
|
||||||
#
|
|
||||||
# Usage:
|
|
||||||
# GRANTS_MONGO_URI="mongodb://cluster.example.com" ./grants_xml_pipeline [-v]
|
|
||||||
#
|
|
||||||
# Dependencies:
|
|
||||||
# - libarchive-tools (bsdtar)
|
|
||||||
# - yq (xq) for XML→JSON conversion
|
|
||||||
# - MongoDB shell tools
|
|
||||||
|
|
||||||
set -eo pipefail
|
|
||||||
shopt -s lastpipe
|
shopt -s lastpipe
|
||||||
trap 'handle_error $LINENO' ERR
|
|
||||||
|
|
||||||
## === Configuration ===
|
### --- Configuration ---
|
||||||
readonly VERSION="1.1.0"
|
readonly MONGO_URI="${GRANTS_MONGO_URI:?Required env var}"
|
||||||
readonly SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
readonly DISCORD_WEBHOOK="${DISCORD_WEBHOOK:-}"
|
||||||
readonly MONGO_URI="${GRANTS_MONGO_URI:?Error: GRANTS_MONGO_URI environment variable required}"
|
|
||||||
readonly COLLECTION_PREFIX="opportunities"
|
|
||||||
readonly BASE_URL="https://prod-grants-gov-chatbot.s3.amazonaws.com/extracts"
|
readonly BASE_URL="https://prod-grants-gov-chatbot.s3.amazonaws.com/extracts"
|
||||||
readonly QUALITY_THRESHOLD_PCT="${GRANTS_QUALITY_THRESHOLD:-80}"
|
readonly TODAY=$(date -u +%Y%m%d)
|
||||||
readonly SCHEMA_VALIDATION_ENABLED="${GRANTS_SCHEMA_VALIDATION:-true}"
|
readonly COLLECTION="grants_$(date -u +%Y%m)"
|
||||||
readonly BULK_BATCH_SIZE=1000
|
readonly LOCKFILE="/tmp/grants_ingest_$TODAY.lock"
|
||||||
VERBOSE=false
|
readonly BATCH_SIZE=1000
|
||||||
|
readonly LOG_FILE="/var/log/grants_ingest_$TODAY.log"
|
||||||
|
|
||||||
## === Initialization Checks ===
|
### --- Initialization & Cleanup ---
|
||||||
|
cleanup() {
|
||||||
|
local exit_code=$?
|
||||||
|
rm -f "$LOCKFILE"
|
||||||
|
[ $exit_code -ne 0 ] && send_alert ERROR "Pipeline interrupted"
|
||||||
|
exit $exit_code
|
||||||
|
}
|
||||||
|
trap cleanup EXIT INT TERM
|
||||||
|
|
||||||
|
# Atomic execution lock
|
||||||
|
if ! (set -o noclobber; echo $$ > "$LOCKFILE") 2>/dev/null; then
|
||||||
|
echo "Error: Ingest already running for $TODAY (PID $(<"$LOCKFILE"))" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
### --- Health Checks ---
|
||||||
validate_environment() {
|
validate_environment() {
|
||||||
# Verify all dependencies exist
|
[[ "$TODAY" =~ ^[0-9]{8}$ ]] || {
|
||||||
local missing=()
|
send_alert ERROR "Invalid date format: $TODAY"
|
||||||
for cmd in bsdtar xq mongosh curl xmllint; do
|
|
||||||
if ! command -v "$cmd" >/dev/null; then
|
|
||||||
missing+=("$cmd")
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
|
|
||||||
if (( ${#missing[@]} > 0 )); then
|
|
||||||
log_error "Missing dependencies: ${missing[*]}"
|
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
}
|
||||||
|
|
||||||
# Verify MongoDB connection
|
if ! mongosh "$MONGO_URI" --quiet --eval "db.adminCommand('ping')" >/dev/null; then
|
||||||
if ! mongosh "$MONGO_URI" --eval "db.version()" >/dev/null; then
|
send_alert ERROR "MongoDB connection failed"
|
||||||
log_error "Cannot connect to MongoDB at $MONGO_URI"
|
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
## === Logging Utilities ===
|
### --- Notification System ---
|
||||||
log() {
|
send_alert() {
|
||||||
local level="$1"
|
local level="$1" message="$2"
|
||||||
local message="$2"
|
[ -z "$DISCORD_WEBHOOK" ] && return
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S')] [${level}] ${message}" >&2
|
|
||||||
}
|
|
||||||
|
|
||||||
log_info() {
|
local color=16711680 # Red
|
||||||
if $VERBOSE || [[ "$1" != "DEBUG" ]]; then
|
case "$level" in
|
||||||
log "$@"
|
SUCCESS) color=65280 ;; # Green
|
||||||
fi
|
WARNING) color=16776960 ;; # Yellow
|
||||||
}
|
|
||||||
|
|
||||||
log_error() {
|
|
||||||
log "ERROR" "$1"
|
|
||||||
exit 1
|
|
||||||
}
|
|
||||||
|
|
||||||
handle_error() {
|
|
||||||
local line="$1"
|
|
||||||
log_error "Error occurred at line $line"
|
|
||||||
}
|
|
||||||
|
|
||||||
## === Data Transformation ===
|
|
||||||
transform_to_json() {
|
|
||||||
local schema_version="$1"
|
|
||||||
xq -c --arg schema_version "$schema_version" '
|
|
||||||
.Opportunities.Opportunity[] |
|
|
||||||
._metadata = {
|
|
||||||
ingestion_date: now|todateiso8601,
|
|
||||||
schema_version: $schema_version,
|
|
||||||
content_sha256: env.CONTENT_SHA256,
|
|
||||||
pipeline_version: env.VERSION
|
|
||||||
}'
|
|
||||||
}
|
|
||||||
|
|
||||||
## === Main Pipeline ===
|
|
||||||
execute_data_pipeline() {
|
|
||||||
local current_date=$(date +%Y%m%d)
|
|
||||||
local collection_name="${COLLECTION_PREFIX}_$(date +%Y%m)"
|
|
||||||
local schema_version=$(get_schema_version)
|
|
||||||
|
|
||||||
log_info "INFO" "Starting ingestion for ${current_date} (schema ${schema_version})"
|
|
||||||
|
|
||||||
# Establish quality baseline
|
|
||||||
local baseline_count=$(get_previous_document_count "$collection_name")
|
|
||||||
log_info "INFO" "Baseline document count: ${baseline_count}"
|
|
||||||
|
|
||||||
# Execute streaming pipeline
|
|
||||||
curl -LfsS "${BASE_URL}/GrantsDBExtract${current_date}v2.zip" |
|
|
||||||
tee >(compute_content_hash) |
|
|
||||||
bsdtar -xOf - '*.xml' |
|
|
||||||
validate_xml_stream |
|
|
||||||
transform_to_json "$schema_version" |
|
|
||||||
import_to_mongodb "$collection_name" "$baseline_count"
|
|
||||||
|
|
||||||
log_info "INFO" "Pipeline completed successfully"
|
|
||||||
}
|
|
||||||
|
|
||||||
## === Entry Point ===
|
|
||||||
main() {
|
|
||||||
# Parse command-line arguments
|
|
||||||
while getopts ":v" opt; do
|
|
||||||
case $opt in
|
|
||||||
v) VERBOSE=true;;
|
|
||||||
\?) log_error "Invalid option: -$OPTARG";;
|
|
||||||
esac
|
esac
|
||||||
done
|
|
||||||
|
|
||||||
validate_environment
|
curl -fsS -X POST "$DISCORD_WEBHOOK" \
|
||||||
execute_data_pipeline
|
-H "Content-Type: application/json" \
|
||||||
exit 0
|
-d "$(jq -n \
|
||||||
|
--arg msg "[$(date -u +'%Y-%m-%d %H:%M:%S UTC')] $message" \
|
||||||
|
--argjson col "$color" \
|
||||||
|
'{embeds: [{color: $col, description: $msg}]}')" || true
|
||||||
}
|
}
|
||||||
|
|
||||||
main "$@"
|
### --- Data Processing ---
|
||||||
|
process_batch() {
|
||||||
|
local batch_file="$1"
|
||||||
|
mongosh "$MONGO_URI" --quiet --eval "
|
||||||
|
const COLLECTION = '$COLLECTION';
|
||||||
|
const batch = $(cat "$batch_file");
|
||||||
|
const result = db[COLLECTION].insertMany(batch, {ordered: false});
|
||||||
|
print(JSON.stringify({
|
||||||
|
processed: result.insertedCount,
|
||||||
|
failed: batch.length - result.insertedCount
|
||||||
|
}));
|
||||||
|
"
|
||||||
|
}
|
||||||
|
|
||||||
|
### --- Main Pipeline ---
|
||||||
|
main() {
|
||||||
|
validate_environment
|
||||||
|
|
||||||
|
# Check for existing data
|
||||||
|
local existing_count=$(mongosh "$MONGO_URI" --quiet --eval "
|
||||||
|
db.${COLLECTION}.countDocuments({'_bi_metadata.ingest_date': '$TODAY'})
|
||||||
|
")
|
||||||
|
|
||||||
|
if [ "$existing_count" -gt 0 ]; then
|
||||||
|
send_alert WARNING "Skipping ingest: $existing_count records already exist"
|
||||||
|
return 0
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "=== Starting ingest for $TODAY ===" | tee "$LOG_FILE"
|
||||||
|
local start_time=$(date +%s)
|
||||||
|
local total_processed=0 total_failed=0
|
||||||
|
local batch_count=0
|
||||||
|
local batch_file=$(mktemp)
|
||||||
|
|
||||||
|
# FIXED: Use process substitution to preserve variable scope
|
||||||
|
while IFS= read -r json; do
|
||||||
|
echo "$json" >> "$batch_file"
|
||||||
|
((batch_count++))
|
||||||
|
|
||||||
|
if [ $((batch_count % BATCH_SIZE)) -eq 0 ]; then
|
||||||
|
local batch_stats=$(process_batch "$batch_file")
|
||||||
|
local batch_processed=$(jq -r '.processed' <<< "$batch_stats")
|
||||||
|
local batch_failed=$(jq -r '.failed' <<< "$batch_stats")
|
||||||
|
|
||||||
|
total_processed=$((total_processed + batch_processed))
|
||||||
|
total_failed=$((total_failed + batch_failed))
|
||||||
|
|
||||||
|
> "$batch_file" # Reset batch
|
||||||
|
echo "Processed $batch_count records ($total_processed success, $total_failed failed)" | tee -a "$LOG_FILE"
|
||||||
|
fi
|
||||||
|
done < <(
|
||||||
|
curl -LfsS "$BASE_URL/GrantsDBExtract${TODAY}v2.zip" | \
|
||||||
|
bsdtar -xOf - '*.xml' | \
|
||||||
|
xq -c --arg today "$TODAY" '
|
||||||
|
.Opportunities.Opportunity[] |
|
||||||
|
try (
|
||||||
|
._bi_metadata = {
|
||||||
|
ingest_date: $today,
|
||||||
|
quality_score: {
|
||||||
|
completeness: ((.OpportunityTitle | length) / 255),
|
||||||
|
has_funding: (.AwardCeiling != null),
|
||||||
|
has_deadline: (.CloseDate != null)
|
||||||
|
},
|
||||||
|
stats: {
|
||||||
|
funding_type: .FundingInstrumentType,
|
||||||
|
category: .OpportunityCategory,
|
||||||
|
award_range: {
|
||||||
|
floor: (.AwardFloor | tonumber?),
|
||||||
|
ceiling: (.AwardCeiling | tonumber?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
) catch empty'
|
||||||
|
)
|
||||||
|
|
||||||
|
# Process final batch
|
||||||
|
if [ -s "$batch_file" ]; then
|
||||||
|
local batch_stats=$(process_batch "$batch_file")
|
||||||
|
total_processed=$((total_processed + $(jq -r '.processed' <<< "$batch_stats")))
|
||||||
|
total_failed=$((total_failed + $(jq -r '.failed' <<< "$batch_stats")))
|
||||||
|
fi
|
||||||
|
rm -f "$batch_file"
|
||||||
|
|
||||||
|
# Final report
|
||||||
|
local duration=$(( $(date +%s) - start_time ))
|
||||||
|
local msg="Ingested $total_processed records ($total_failed failed) in ${duration}s"
|
||||||
|
|
||||||
|
if [ "$total_failed" -eq 0 ]; then
|
||||||
|
send_alert SUCCESS "$msg"
|
||||||
|
else
|
||||||
|
send_alert WARNING "$msg"
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
### --- Execution ---
|
||||||
|
main 2>&1 | tee -a "$LOG_FILE"
|
||||||
Reference in New Issue
Block a user