Update projects/grants_xml_pipeline.md
This commit is contained in:
@@ -1,3 +1,140 @@
|
|||||||
|
### **Grants.gov XML Pipeline: Robust Daily Ingest with BI Metadata**
|
||||||
|
*(Balancing "just works" with observability)*
|
||||||
|
|
||||||
|
#### **Key Requirements**
|
||||||
|
1. **Daily automated import** (stream XML → MongoDB)
|
||||||
|
2. **Basic health checks** (fail fast + Discord alerts)
|
||||||
|
3. **Embedded business intelligence** (daily diffs, stats)
|
||||||
|
4. **Zero-tempfile streaming** (handle 76MB files efficiently)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### **Final Script** (`grants_daily_ingest.sh`)
|
||||||
|
```bash
|
||||||
|
#!/bin/bash
|
||||||
|
set -euo pipefail
|
||||||
|
shopt -s lastpipe
|
||||||
|
|
||||||
|
# --- Config ---
|
||||||
|
DISCORD_WEBHOOK="https://discord.com/api/webhooks/..." # For alerts
|
||||||
|
MONGO_URI="$GRANTS_MONGO_URI" # From env
|
||||||
|
BASE_URL="https://prod-grants-gov-chatbot.s3.amazonaws.com/extracts"
|
||||||
|
TODAY=$(date +%Y%m%d)
|
||||||
|
COLLECTION="grants_$(date +%Y%m)"
|
||||||
|
|
||||||
|
# --- Discord Alert Function ---
|
||||||
|
notify_discord() {
|
||||||
|
local color=$1 message=$2
|
||||||
|
curl -sfS -X POST "$DISCORD_WEBHOOK" \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d "{\"embeds\":[{\"color\":$color,\"description\":\"$message\"}]}" || true
|
||||||
|
}
|
||||||
|
|
||||||
|
# --- Pipeline ---
|
||||||
|
{
|
||||||
|
# Download and stream XML
|
||||||
|
curl -LfsS "${BASE_URL}/GrantsDBExtract${TODAY}v2.zip" | \
|
||||||
|
bsdtar -xOf - '*.xml' | \
|
||||||
|
|
||||||
|
# Transform with embedded BI metadata
|
||||||
|
xq -c --arg today "$TODAY" '
|
||||||
|
.Opportunities.Opportunity[] |
|
||||||
|
._bi_metadata = {
|
||||||
|
ingest_date: $today,
|
||||||
|
daily_stats: {
|
||||||
|
funding_types: (
|
||||||
|
group_by(.FundingInstrumentType) |
|
||||||
|
map({type: .[0].FundingInstrumentType, count: length})
|
||||||
|
),
|
||||||
|
categories: (
|
||||||
|
group_by(.OpportunityCategory) |
|
||||||
|
map({category: .[0].OpportunityCategory, count: length})
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
' | \
|
||||||
|
|
||||||
|
# Batch import to MongoDB
|
||||||
|
mongosh "$MONGO_URI" --eval "
|
||||||
|
const BATCH_SIZE = 1000;
|
||||||
|
let batch = [];
|
||||||
|
while (true) {
|
||||||
|
const doc = JSON.parse(readline());
|
||||||
|
batch.push(doc);
|
||||||
|
if (batch.length >= BATCH_SIZE) {
|
||||||
|
db.$COLLECTION.insertMany(batch);
|
||||||
|
batch = [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"
|
||||||
|
|
||||||
|
} || {
|
||||||
|
# On failure: Discord alert + exit
|
||||||
|
notify_discord 16711680 "🚨 Grants ingest failed for $TODAY! $(date)"
|
||||||
|
exit 1
|
||||||
|
}
|
||||||
|
|
||||||
|
# Success alert with stats
|
||||||
|
DOC_COUNT=$(mongosh "$MONGO_URI" --quiet --eval "db.$COLLECTION.countDocuments({'_bi_metadata.ingest_date': '$TODAY'})")
|
||||||
|
notify_discord 65280 "✅ Success! Ingested $DOC_COUNT grants for $TODAY"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### **Key Features**
|
||||||
|
1. **Streaming Architecture**
|
||||||
|
- `curl → bsdtar → xq → mongosh` in one pipe (no temp files)
|
||||||
|
- Handles 76MB files with constant memory
|
||||||
|
|
||||||
|
2. **Business Intelligence**
|
||||||
|
- Embeds daily stats in each doc:
|
||||||
|
```json
|
||||||
|
"_bi_metadata": {
|
||||||
|
"ingest_date": "20250801",
|
||||||
|
"daily_stats": {
|
||||||
|
"funding_types": [{"type": "G", "count": 142}, ...],
|
||||||
|
"categories": [{"category": "ACA", "count": 56}, ...]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **Discord Alerts**
|
||||||
|
- Color-coded messages:
|
||||||
|
- 🔴 **Red** on failure (with timestamp)
|
||||||
|
- 🟢 **Green** on success (with doc count)
|
||||||
|
|
||||||
|
4. **Validation via Failure**
|
||||||
|
- No explicit checks → let `curl`/`xq`/`mongosh` fail naturally
|
||||||
|
- `set -euo pipefail` ensures any error stops the script
|
||||||
|
|
||||||
|
5. **MongoDB Optimization**
|
||||||
|
- Batched inserts (1000 docs/transaction)
|
||||||
|
- Collection per month (`grants_202508`)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### **Deployment**
|
||||||
|
1. **Cron Job** (runs daily at 5 AM):
|
||||||
|
```bash
|
||||||
|
0 5 * * * GRANTS_MONGO_URI="mongodb://..." /path/to/grants_daily_ingest.sh
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Sample Discord Alert**
|
||||||
|
![Discord message showing success: "✅ Ingested 1,429 grants for 20250801"]
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### **Tradeoffs**
|
||||||
|
- **No Schema Validation**: Relies on Grants.gov maintaining consistent XML
|
||||||
|
- **No Dependency Checks**: Assumes OS has `bsdtar`, `xq`, etc.
|
||||||
|
- **No Retry Logic**: Fails fast on first error
|
||||||
|
|
||||||
|
For your fully controlled environment, this balances **simplicity** with **observability**. The embedded BI metadata enables daily trend analysis without secondary jobs.
|
||||||
|
|
||||||
|
Want me to tweak any aspect (e.g., add diff tracking vs. yesterday)?
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
### **Ultra-Lean "Just Works" Pipeline**
|
### **Ultra-Lean "Just Works" Pipeline**
|
||||||
Since you control **everything except**:
|
Since you control **everything except**:
|
||||||
1. Schema/XML structure
|
1. Schema/XML structure
|
||||||
|
|||||||
Reference in New Issue
Block a user