add insurance processor script

This commit is contained in:
ipu 2025-07-28 20:33:11 +03:00
parent 4f82307cd9
commit bf1d988d36
21 changed files with 372 additions and 1552 deletions

View file

@ -0,0 +1,210 @@
#!/usr/bin/env python3
"""
Insurance Plan Processor
This script fetches insurance plans from the API endpoint and processes them through the RAG system.
Each plan is parsed into a dictionary and sent to the RAG processing endpoint.
"""
import json
import httpx
import time
from typing import Dict, List, Any
import logging
from config import settings
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class InsurancePlanProcessor:
def __init__(self, talestorm_api_base: str, api_key: str):
"""
Initialize the processor.
Args:
talestorm_api_base: Base URL of the Talestorm API (e.g., "http://localhost:8000")
api_key: API key for authentication
"""
self.talestorm_api_base = talestorm_api_base.rstrip('/')
self.api_key = api_key
self.client = httpx.Client(
headers={
'X-API-Key': api_key
}
)
def fetch_insurance_plans(self) -> List[Dict[str, Any]]:
"""
Fetch insurance plans from the API endpoint.
Returns:
List of insurance plan dictionaries
"""
url = "https://api-virgil.liambo.ai/insurance/plans/full"
try:
logger.info("Fetching insurance plans from API...")
response = self.client.get(url)
response.raise_for_status()
plans = response.json()
logger.info(f"Successfully fetched {len(plans)} insurance plans")
return plans
except httpx.RequestError as e:
logger.error(f"Error fetching insurance plans: {e}")
raise
def process_plan_to_rag(self, plan: Dict[str, Any]) -> bool:
"""
Process a single insurance plan through the RAG endpoint.
Args:
plan: Insurance plan dictionary
Returns:
True if successful, False otherwise
"""
try:
# Convert plan to JSON string
plan_json = json.dumps(plan, indent=2)
plan_name = f"insurance_plan_{plan.get('name', 'unnamed').replace(' ', '_')}"
# Create a file-like object for the upload
import io
file_data = io.BytesIO(plan_json.encode('utf-8'))
# Prepare the multipart form data with proper file format
files = {
'file': (f'{plan_name}.txt', file_data, 'text/plain')
}
# Ensure file pointer is at the beginning
file_data.seek(0)
# Add the plan name as a parameter
data = {
'name': plan_name
}
# Send to RAG processing endpoint
url = f"{self.talestorm_api_base}/rag/process?name={plan_name}"
logger.info(f"Processing plan: {plan_name}")
# Send multipart form data
response = self.client.post(
url,
files=files,
timeout=60.0
)
if response.status_code == 200:
result = response.json()
logger.info(f"Successfully processed plan {plan.get('name', 'Unknown')} - Document name: {result.get('name', 'Unknown')} - {result.get('chunks_count', 0)} chunks created")
return True
else:
logger.error(f"Failed to process plan {plan.get('name', 'Unknown')}: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Error processing plan {plan.get('name', 'Unknown')}: {e}")
return False
finally:
# Reset file pointer for potential reuse
file_data.seek(0)
def process_all_plans(self, delay_between_requests: float = 1.0) -> Dict[str, int]:
"""
Process all insurance plans through the RAG system.
Args:
delay_between_requests: Delay in seconds between processing requests
Returns:
Dictionary with processing statistics
"""
try:
# Fetch all plans
plans = self.fetch_insurance_plans()
if not plans:
logger.warning("No insurance plans found")
return {'total': 0, 'successful': 0, 'failed': 0}
successful = 0
failed = 0
logger.info(f"Starting to process {len(plans)} insurance plans...")
for i, plan in enumerate(plans, 1):
logger.info(f"Processing plan {i}/{len(plans)}: {plan.get('name', 'Unknown')}")
if self.process_plan_to_rag(plan):
successful += 1
else:
failed += 1
# Add delay between requests to avoid overwhelming the server
if i < len(plans): # Don't delay after the last request
time.sleep(delay_between_requests)
stats = {
'total': len(plans),
'successful': successful,
'failed': failed
}
logger.info(f"Processing complete! {successful} successful, {failed} failed out of {len(plans)} total plans")
return stats
except Exception as e:
logger.error(f"Error in process_all_plans: {e}")
raise
finally:
# Close the client
self.client.close()
def main():
"""
Main function to run the insurance plan processor.
"""
import sys
# Configuration from config.py
TALESTORM_API_BASE = settings.TALESTORM_API_BASE_URL
API_KEY = settings.TALESTORM_API_KEY
DELAY_BETWEEN_REQUESTS = float(getattr(settings, 'DELAY_BETWEEN_REQUESTS', '1.0'))
try:
# Create processor instance
processor = InsurancePlanProcessor(TALESTORM_API_BASE, API_KEY)
# Process all plans
stats = processor.process_all_plans(delay_between_requests=DELAY_BETWEEN_REQUESTS)
# Print final statistics
print("\n" + "="*50)
print("PROCESSING SUMMARY")
print("="*50)
print(f"Total plans processed: {stats['total']}")
print(f"Successful: {stats['successful']}")
print(f"Failed: {stats['failed']}")
print(f"Success rate: {(stats['successful']/stats['total']*100):.1f}%" if stats['total'] > 0 else "No plans processed")
print("="*50)
if stats['failed'] > 0:
sys.exit(1)
except Exception as e:
logger.error(f"Script failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()