diff --git a/smma/grant_starting.md b/smma/grant_starting.md index 5e6a28e..d52a84a 100644 --- a/smma/grant_starting.md +++ b/smma/grant_starting.md @@ -614,4 +614,575 @@ sources: schedule: "monthly" ``` -This layer just moves bytes around. Zero business logic. Want me to detail the validation layer next? \ No newline at end of file +This layer just moves bytes around. Zero business logic. Want me to detail the validation layer next? + +--- + +**Validation Layer:** + +```python +class DataValidator: + def __init__(self, source_type): + self.source_type = source_type + self.validation_rules = self.load_rules() + + def validate_batch(self, batch_id): + """Run all validations on a batch""" + results = ValidationResults(batch_id) + + # Structure validation + results.add(self.validate_structure()) + + # Content validation + results.add(self.validate_content()) + + # Business rules validation + results.add(self.validate_business_rules()) + + return results + +class ValidationResults: + def __init__(self, batch_id): + self.batch_id = batch_id + self.errors = [] + self.warnings = [] + self.stats = {} + self.is_valid = True +``` + +**Validation Types:** + +**1. Structure Validation** +```python +def validate_xml_structure(self, xml_data): + # Schema validation against XSD + # Required elements present + # Data types correct + pass + +def validate_csv_structure(self, csv_data): + # Expected columns present + # Header row format + # Row count reasonable + pass +``` + +**2. Content Validation** +```python +def validate_content_quality(self, records): + # Null/empty critical fields + # Date formats and ranges + # Numeric field sanity checks + # Text encoding issues + pass +``` + +**3. Business Rules Validation** +```python +def validate_business_rules(self, records): + # Deadline dates in future + # Award amounts reasonable ranges + # Agency codes exist in lookup tables + # CFDA numbers valid format + pass +``` + +**Validation Schema:** + +```sql +validation_results ( + id, batch_id, validation_type, status, + error_count, warning_count, record_count, + validation_details JSONB, created_at +) + +validation_errors ( + id, batch_id, record_id, error_type, + error_message, field_name, field_value, + severity, created_at +) +``` + +**Configurable Rules:** +```yaml +grants_gov_rules: + required_fields: [title, agency, deadline, amount] + date_fields: + deadline: + min_future_days: 1 + max_future_days: 730 + amount_fields: + min_value: 1000 + max_value: 50000000 + +usa_spending_rules: + # Different rules per source +``` + +**Validation Actions:** +- **PASS**: Process normally +- **WARN**: Process but flag issues +- **FAIL**: Block processing, alert operators +- **QUARANTINE**: Isolate problematic records + +**Key Features:** +- **Non-destructive**: Never modifies raw data +- **Auditable**: Track what failed and why +- **Configurable**: Rules can change without code changes +- **Granular**: Per-record and batch-level validation + +The validator just says "good/bad/ugly" - doesn't fix anything. That's the normalizer's job. + +--- + +**Normalization Layer:** + +```python +class DataNormalizer: + def __init__(self, source_type): + self.source_type = source_type + self.field_mappings = self.load_field_mappings() + self.transformations = self.load_transformations() + + def normalize_batch(self, batch_id): + """Convert raw validated data to standard schema""" + raw_records = self.get_validated_records(batch_id) + normalized_records = [] + + for record in raw_records: + try: + normalized = self.normalize_record(record) + normalized_records.append(normalized) + except Exception as e: + self.log_normalization_error(record.id, e) + + return self.store_normalized_records(normalized_records) + +class RecordNormalizer: + def normalize_record(self, raw_record): + """Transform single record to standard format""" + normalized = {} + + # Field mapping + for std_field, raw_field in self.field_mappings.items(): + normalized[std_field] = self.extract_field(raw_record, raw_field) + + # Data transformations + normalized = self.apply_transformations(normalized) + + # Generate derived fields + normalized = self.add_derived_fields(normalized) + + return normalized +``` + +**Field Mapping Configs:** +```yaml +grants_gov_mappings: + title: "OpportunityTitle" + agency: "AgencyName" + deadline: "CloseDate" + amount: "AwardCeiling" + description: "Description" + cfda_number: "CFDANumbers" + +usa_spending_mappings: + recipient_name: "recipient_name" + award_amount: "federal_action_obligation" + agency: "awarding_agency_name" + award_date: "action_date" +``` + +**Data Transformations:** +```python +class FieldTransformers: + @staticmethod + def normalize_agency_name(raw_agency): + # "DEPT OF HEALTH AND HUMAN SERVICES" → "HHS" + # Handle common variations, abbreviations + pass + + @staticmethod + def parse_amount(raw_amount): + # Handle "$1,000,000", "1000000.00", "1M", etc. + # Return standardized decimal + pass + + @staticmethod + def parse_date(raw_date): + # Handle multiple date formats + # Return ISO format + pass + + @staticmethod + def extract_naics_codes(description_text): + # Parse NAICS codes from text + # Return list of codes + pass +``` + +**Standard Schema (Target):** +```sql +normalized_opportunities ( + id, source, source_id, title, agency_code, + agency_name, amount_min, amount_max, deadline, + description, opportunity_type, cfda_number, + naics_codes, set_asides, geographic_scope, + created_at, updated_at, batch_id +) + +normalized_awards ( + id, source, source_id, recipient_name, + recipient_type, award_amount, award_date, + agency_code, agency_name, award_type, + description, naics_code, place_of_performance, + created_at, batch_id +) +``` + +**Normalization Tracking:** +```sql +normalization_results ( + id, batch_id, source_records, normalized_records, + error_records, transformation_stats JSONB, + processing_time, created_at +) + +normalization_errors ( + id, batch_id, source_record_id, error_type, + error_message, field_name, raw_value, + created_at +) +``` + +**Key Features:** +- **Lossy but Reversible**: Can always trace back to raw data +- **Configurable**: Field mappings via config files +- **Extensible**: Easy to add new transformations +- **Consistent**: Same output schema regardless of source +- **Auditable**: Track what transformations were applied + +**Error Handling:** +- **Best Effort**: Extract what's possible, flag what fails +- **Partial Records**: Save normalized fields even if some fail +- **Recovery**: Can re-run normalization with updated rules + +--- + +**Enrichment Engine Interface:** + +```python +class EnrichmentEngine: + def __init__(self): + self.processors = self.load_processors() + self.dependency_graph = self.build_dependency_graph() + + def enrich_batch(self, batch_id, processor_names=None): + """Run enrichment processors on normalized batch""" + processors = processor_names or self.get_enabled_processors() + execution_order = self.resolve_dependencies(processors) + + results = EnrichmentResults(batch_id) + + for processor_name in execution_order: + processor = self.processors[processor_name] + try: + result = processor.process_batch(batch_id) + results.add_processor_result(processor_name, result) + except Exception as e: + results.add_error(processor_name, e) + + return results + +class BaseEnrichmentProcessor: + """Abstract base for all enrichment processors""" + + name = None + depends_on = [] # Other processors this depends on + output_tables = [] # What tables this writes to + + def process_batch(self, batch_id): + """Process a batch of normalized records""" + records = self.get_normalized_records(batch_id) + enriched_data = [] + + for record in records: + enriched = self.process_record(record) + if enriched: + enriched_data.append(enriched) + + return self.store_enriched_data(enriched_data) + + def process_record(self, record): + """Override this - core enrichment logic""" + raise NotImplementedError +``` + +**Sample Enrichment Processors:** + +```python +class DeadlineUrgencyProcessor(BaseEnrichmentProcessor): + name = "deadline_urgency" + output_tables = ["opportunity_metrics"] + + def process_record(self, opportunity): + if not opportunity.deadline: + return None + + days_remaining = (opportunity.deadline - datetime.now()).days + urgency_score = self.calculate_urgency_score(days_remaining) + + return { + 'opportunity_id': opportunity.id, + 'days_to_deadline': days_remaining, + 'urgency_score': urgency_score, + 'urgency_category': self.categorize_urgency(days_remaining) + } + +class AgencySpendingPatternsProcessor(BaseEnrichmentProcessor): + name = "agency_patterns" + depends_on = ["historical_awards"] # Needs historical data first + output_tables = ["agency_metrics"] + + def process_record(self, opportunity): + agency_history = self.get_agency_history(opportunity.agency_code) + + return { + 'agency_code': opportunity.agency_code, + 'avg_award_amount': agency_history.avg_amount, + 'typical_award_timeline': agency_history.avg_timeline, + 'funding_seasonality': agency_history.seasonal_patterns, + 'competition_level': agency_history.avg_applicants + } + +class CompetitiveIntelProcessor(BaseEnrichmentProcessor): + name = "competitive_intel" + depends_on = ["agency_patterns", "historical_awards"] + output_tables = ["opportunity_competition"] + + def process_record(self, opportunity): + similar_opps = self.find_similar_opportunities(opportunity) + winner_patterns = self.analyze_winner_patterns(similar_opps) + + return { + 'opportunity_id': opportunity.id, + 'estimated_applicants': winner_patterns.avg_applicants, + 'win_rate_by_org_type': winner_patterns.win_rates, + 'typical_winner_profile': winner_patterns.winner_characteristics, + 'competition_score': self.calculate_competition_score(winner_patterns) + } +``` + +**Enrichment Storage Schema:** + +```sql +-- Opportunity-level enrichments +opportunity_metrics ( + opportunity_id, days_to_deadline, urgency_score, + competition_score, success_probability, + created_at, processor_version +) + +-- Agency-level enrichments +agency_metrics ( + agency_code, avg_award_amount, funding_cycles, + payment_reliability, bureaucracy_score, + created_at, processor_version +) + +-- Historical patterns +recipient_patterns ( + recipient_id, win_rate, specialties, + avg_award_size, geographic_focus, + created_at, processor_version +) +``` + +**Configuration-Driven Processing:** + +```yaml +enrichment_config: + enabled_processors: + - deadline_urgency + - agency_patterns + - competitive_intel + + processor_settings: + deadline_urgency: + urgency_thresholds: [7, 30, 90] + competitive_intel: + similarity_threshold: 0.8 + lookback_years: 3 +``` + +**Key Features:** +- **Modular**: Each processor is independent +- **Dependency-Aware**: Processors run in correct order +- **Versioned**: Track which version of logic created what data +- **Configurable**: Enable/disable processors per client +- **Reprocessable**: Can re-run enrichments with new logic +- **Incremental**: Only process new/changed records + +**Processor Registry:** +```python +class ProcessorRegistry: + processors = {} + + @classmethod + def register(cls, processor_class): + cls.processors[processor_class.name] = processor_class + + @classmethod + def get_processor(cls, name): + return cls.processors[name]() + +# Auto-discovery of processors +@ProcessorRegistry.register +class MyCustomProcessor(BaseEnrichmentProcessor): + # Implementation + pass +``` + +This interface lets you plug in any enrichment logic without touching the core pipeline. Want to see how the API layer consumes all this enriched data? + +--- + +**Core API Endpoints:** + +## **Opportunity Discovery APIs** + +``` +GET /api/v1/opportunities + - Live grant/contract opportunities + - Filters: keywords, agency, amount_range, deadline_range, location, naics, cfda + - Sort: deadline, amount, relevance_score, competition_score + - Pagination: limit, offset + - Response: opportunities + enrichment data + +GET /api/v1/opportunities/{id} + - Full opportunity details + all enrichments + - Related opportunities (similar/agency/category) + - Historical context (agency patterns, similar awards) + +GET /api/v1/opportunities/search + - Full-text search across titles/descriptions + - Semantic search capabilities + - Saved search functionality +``` + +## **Historical Intelligence APIs** + +``` +GET /api/v1/awards + - Past awards/contracts (USAspending data) + - Filters: recipient, agency, amount_range, date_range, location + - Aggregations: by_agency, by_recipient_type, by_naics + +GET /api/v1/awards/trends + - Spending trends over time + - Agency funding patterns + - Market size analysis by category + +GET /api/v1/recipients/{id}/history + - Complete award history for organization + - Success patterns, specializations + - Competitive positioning +``` + +## **Market Intelligence APIs** + +``` +GET /api/v1/agencies + - Agency profiles with spending patterns + - Funding cycles, preferences, reliability scores + +GET /api/v1/agencies/{code}/opportunities + - Current opportunities from specific agency + - Historical patterns, typical award sizes + +GET /api/v1/market/analysis + - Market sizing by sector/naics/keyword + - Competition density analysis + - Funding landscape overview +``` + +## **Enrichment & Scoring APIs** + +``` +GET /api/v1/opportunities/{id}/score + - Custom scoring based on client profile + - Fit score, competition score, success probability + +POST /api/v1/opportunities/batch-score + - Score multiple opportunities at once + - Client-specific scoring criteria + +GET /api/v1/competitive-intel + - Who wins what types of awards + - Success patterns by organization characteristics +``` + +## **Alert & Monitoring APIs** + +``` +POST /api/v1/alerts + - Create custom alert criteria + - Email/webhook delivery options + +GET /api/v1/alerts/{id}/results + - Recent matches for saved alert + - Historical performance of alert criteria + +POST /api/v1/watchlist + - Monitor specific agencies/programs/competitors +``` + +## **Analytics & Reporting APIs** + +``` +GET /api/v1/analytics/dashboard + - Client-specific dashboard data + - Opportunity pipeline, success metrics + +GET /api/v1/reports/market-summary + - Periodic market analysis reports + - Funding landscape changes + +POST /api/v1/reports/custom + - Generate custom analysis reports + - Export capabilities (PDF/Excel) +``` + +**API Response Format:** +```json +{ + "data": [...], + "meta": { + "total": 1250, + "page": 1, + "per_page": 50, + "filters_applied": {...}, + "data_freshness": "2024-01-15T10:30:00Z" + }, + "enrichments": { + "competition_scores": true, + "agency_patterns": true, + "deadline_urgency": true + } +} +``` + +**Authentication & Rate Limiting:** +- API key authentication +- Usage-based pricing tiers +- Rate limits by subscription level +- Client-specific data access controls + +**Key Value Props:** +- **Speed**: Pre-processed, indexed, ready to query +- **Intelligence**: Enriched beyond raw government data +- **Relevance**: Sophisticated filtering and scoring +- **Insights**: Historical patterns and competitive intelligence +- **Automation**: Alerts and monitoring capabilities + +This API design gives clients everything from basic opportunity search to sophisticated competitive intelligence - all the value-add layers on top of the raw government data. \ No newline at end of file