add drug parser; add iha estimation rules

This commit is contained in:
ipu 2025-07-31 22:08:02 +03:00
parent bf1d988d36
commit 80916f6c3e
10 changed files with 1271 additions and 15 deletions

View file

@ -1,8 +1,9 @@
import httpx
import json
from typing import Dict, Any, List, Optional
from ..config import settings
from .session_service import session_service
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime, timedelta
from src.config import settings
from src.services.session_service import session_service
class EstimationService:
"""Service for handling insurance estimation via TALESTORM API"""
@ -11,6 +12,331 @@ class EstimationService:
self.base_url = settings.TALESTORM_API_BASE_URL
self.api_key = settings.TALESTORM_API_KEY
self.agent_id = settings.TALESTORM_ESTIMATION_AGENT_ID
self.insurance_api_base_url = settings.VIRGIL_API_BASE_URL
# Initialize uninsurable conditions and medications
self._initialize_underwriting_guidelines()
def _initialize_underwriting_guidelines(self):
"""Initialize uninsurable conditions and medications based on IHA guidelines"""
# Permanently uninsurable conditions
self.permanently_uninsurable_conditions = {
"organ_transplant", "amputation_disease", "emphysema", "chronic_pulmonary",
"oxygen_use", "nebulizer_use", "parkinsons", "multiple_sclerosis", "als",
"lupus", "myasthenia_gravis", "alzheimers", "dementia", "cognitive_disorders",
"aids", "arc", "hiv"
}
# Conditions uninsurable if diagnosed/treated in past 5 years
self.recent_uninsurable_conditions = {
"coronary_artery_disease", "heart_failure", "pacemaker", "defibrillator",
"enlarged_heart", "valve_surgery", "stroke", "tia", "carotid_disease",
"peripheral_vascular_disease", "rheumatoid_arthritis", "crippling_arthritis",
"cancer", "kidney_disease", "liver_disease", "hepatitis", "cirrhosis",
"alcoholism", "drug_abuse", "mental_nervous_hospitalization",
"heart_rhythm_disorders", "osteoporosis_fractures"
}
# Uninsurable medications (partial list)
self.uninsurable_medications = {
# HIV/AIDS medications
"abacavir", "lamivudine", "zidovudine", "tenofovir", "efavirenz", "nevirapine",
"atazanavir", "darunavir", "raltegravir", "dolutegravir",
# Cancer medications
"abarelix", "chlorambucil", "tamoxifen", "anastrozole", "letrozole",
"exemestane", "fulvestrant", "bicalutamide", "flutamide", "goserelin",
"leuprolide", "triptorelin", "bexarotene", "vorinostat", "romidepsin",
# Severe arthritis medications
"adalimumab", "methotrexate", "infliximab", "etanercept", "golimumab",
"certolizumab", "ustekinumab", "secukinumab", "ixekizumab", "guselkumab",
"tildrakizumab", "risankizumab", "upadacitinib", "tofacitinib", "baricitinib",
# Dementia medications
"donepezil", "memantine", "galantamine", "rivastigmine", "tacrine",
# Schizophrenia/psychosis medications
"aripiprazole", "olanzapine", "risperidone", "quetiapine", "ziprasidone",
"clozapine", "paliperidone", "asenapine", "iloperidone", "lurasidone",
"cariprazine", "brexpiprazole", "lumateperone",
# Parkinson's disease medications
"carbidopa", "levodopa", "selegiline", "rasagiline", "entacapone",
"tolcapone", "pramipexole", "ropinirole", "rotigotine", "apomorphine",
"amantadine", "trihexyphenidyl", "benztropine",
# Multiple sclerosis medications
"glatiramer", "interferon_beta", "natalizumab", "fingolimod", "teriflunomide",
"dimethyl_fumarate", "alemtuzumab", "ocrelizumab", "cladribine", "siponimod",
"ozanimod", "ponesimod",
# Severe cardiovascular disease medications
"amiodarone", "warfarin", "dabigatran", "rivaroxaban", "apixaban",
"edoxaban", "clopidogrel", "ticagrelor", "prasugrel", "dipyridamole",
"cilostazol", "pentoxifylline"
}
# Height and weight thresholds for premium categories
self.height_weight_chart = {
"5'0": {"preferred": (90, 102), "standard": (103, 218), "high": (219, 260)},
"5'1": {"preferred": (93, 105), "standard": (106, 221), "high": (222, 264)},
"5'2": {"preferred": (96, 108), "standard": (109, 224), "high": (225, 268)},
"5'3": {"preferred": (99, 111), "standard": (112, 227), "high": (228, 272)},
"5'4": {"preferred": (102, 114), "standard": (115, 230), "high": (231, 276)},
"5'5": {"preferred": (105, 117), "standard": (118, 233), "high": (234, 280)},
"5'6": {"preferred": (108, 120), "standard": (121, 236), "high": (237, 284)},
"5'7": {"preferred": (111, 123), "standard": (124, 239), "high": (240, 288)},
"5'8": {"preferred": (114, 126), "standard": (127, 242), "high": (243, 292)},
"5'9": {"preferred": (117, 129), "standard": (130, 245), "high": (246, 296)},
"5'10": {"preferred": (120, 132), "standard": (133, 248), "high": (249, 300)},
"5'11": {"preferred": (123, 135), "standard": (136, 251), "high": (252, 304)},
"6'0": {"preferred": (126, 138), "standard": (139, 254), "high": (255, 308)},
"6'1": {"preferred": (129, 141), "standard": (142, 257), "high": (258, 312)},
"6'2": {"preferred": (132, 144), "standard": (145, 260), "high": (261, 316)},
"6'3": {"preferred": (135, 147), "standard": (148, 263), "high": (264, 320)},
"6'4": {"preferred": (138, 150), "standard": (151, 266), "high": (267, 324)},
"6'5": {"preferred": (141, 153), "standard": (154, 269), "high": (270, 328)},
"6'6": {"preferred": (144, 156), "standard": (157, 272), "high": (273, 332)}
}
def _check_uninsurable_conditions(self, phq: Dict[str, Any]) -> List[str]:
"""Check for uninsurable conditions in PHQ data"""
issues = []
if not phq:
return issues
# Check conditions
for condition in phq.get("conditions", []):
condition_desc = condition.get("description", "").lower()
condition_key = condition.get("key", "").lower()
# Check for permanently uninsurable conditions
for uninsurable in self.permanently_uninsurable_conditions:
if uninsurable in condition_desc or uninsurable in condition_key:
issues.append(f"Permanently uninsurable condition: {condition.get('description', '')}")
# Check for recent uninsurable conditions (past 5 years)
for recent in self.recent_uninsurable_conditions:
if recent in condition_desc or recent in condition_key:
issues.append(f"Recent uninsurable condition (past 5 years): {condition.get('description', '')}")
# Check health issues
for issue in phq.get("issues", []):
issue_key = issue.get("key", "").lower()
issue_desc = " ".join([detail.get("description", "") for detail in issue.get("details", [])]).lower()
# Check for uninsurable issues
for uninsurable in self.permanently_uninsurable_conditions:
if uninsurable in issue_desc or uninsurable in issue_key:
issues.append(f"Permanently uninsurable health issue: {issue.get('key', '')}")
for recent in self.recent_uninsurable_conditions:
if recent in issue_desc or recent in issue_key:
issues.append(f"Recent uninsurable health issue (past 5 years): {issue.get('key', '')}")
return issues
def _check_uninsurable_medications(self, phq: Dict[str, Any]) -> List[str]:
"""Check for uninsurable medications in PHQ data"""
issues = []
if not phq:
return issues
for medication in phq.get("medications", []):
med_name = medication.get("name", "").lower()
# Check against uninsurable medication list
for uninsurable_med in self.uninsurable_medications:
if uninsurable_med in med_name:
issues.append(f"Uninsurable medication: {medication.get('name', '')}")
break
return issues
def _check_diabetes_risk_factors(self, phq: Dict[str, Any]) -> List[str]:
"""Check for diabetes-related uninsurability factors"""
issues = []
if not phq:
return issues
# Count diabetes and hypertension medications
diabetes_meds = []
hypertension_meds = []
for medication in phq.get("medications", []):
med_name = medication.get("name", "").lower()
# Diabetes medications
if any(dm_med in med_name for dm_med in ["insulin", "metformin", "glipizide", "glyburide", "glimepiride", "sitagliptin", "saxagliptin", "linagliptin", "alogliptin", "empagliflozin", "dapagliflozin", "canagliflozin", "dulaglutide", "liraglutide", "semaglutide", "exenatide"]):
diabetes_meds.append(medication.get("name", ""))
# Hypertension medications
if any(htn_med in med_name for htn_med in ["lisinopril", "enalapril", "ramipril", "quinapril", "benazepril", "fosinopril", "trandolapril", "moexipril", "perindopril", "losartan", "valsartan", "irbesartan", "candesartan", "olmesartan", "telmisartan", "eprosartan", "azilsartan", "amlodipine", "nifedipine", "felodipine", "isradipine", "nicardipine", "nifedipine", "diltiazem", "verapamil", "atenolol", "metoprolol", "propranolol", "carvedilol", "nebivolol", "bisoprolol", "hydrochlorothiazide", "chlorthalidone", "indapamide", "furosemide", "bumetanide", "torsemide", "spironolactone", "eplerenone", "doxazosin", "terazosin", "prazosin", "clonidine", "methyldopa", "hydralazine", "minoxidil"]):
hypertension_meds.append(medication.get("name", ""))
# Check for diabetes risk factors
if len(diabetes_meds) >= 3:
issues.append(f"High diabetes medication count ({len(diabetes_meds)}): {', '.join(diabetes_meds)}")
if len(hypertension_meds) >= 3:
issues.append(f"High hypertension medication count ({len(hypertension_meds)}): {', '.join(hypertension_meds)}")
# Check for insulin use > 50 units/day
for medication in phq.get("medications", []):
if "insulin" in medication.get("name", "").lower():
dosage = medication.get("dosage", "")
frequency = medication.get("frequency", "")
# Simple check for high insulin dosage (this would need more sophisticated parsing)
if "50" in dosage or "100" in dosage:
issues.append(f"High insulin dosage detected: {medication.get('name', '')} {dosage} {frequency}")
return issues
def _check_height_weight_eligibility(self, applicant: Dict[str, Any]) -> Tuple[str, str]:
"""Check height/weight eligibility and determine premium category"""
height_ft = applicant.get("heightFt", 0)
height_in = applicant.get("heightIn", 0)
weight = applicant.get("weight", 0)
if not all([height_ft, height_in, weight]):
return "unknown", "Missing height or weight information"
# Format height as "5'6" style
height_key = f"{height_ft}'{height_in}"
if height_key not in self.height_weight_chart:
return "unknown", f"Height {height_key} not in eligibility chart"
thresholds = self.height_weight_chart[height_key]
if weight < thresholds["preferred"][0]:
return "underweight", f"Weight {weight} lbs below minimum for height {height_key}"
elif weight <= thresholds["preferred"][1]:
return "preferred", "Eligible for preferred premium"
elif weight <= thresholds["standard"][1]:
return "standard", "Standard premium category"
elif weight <= thresholds["high"][1]:
return "high", "High premium category due to weight"
else:
return "uninsurable", f"Weight {weight} lbs exceeds maximum for height {height_key}"
return "unknown", "Unable to determine eligibility"
def _calculate_bmi(self, applicant: Dict[str, Any]) -> float:
"""Calculate BMI for an applicant"""
height_ft = applicant.get("heightFt", 0)
height_in = applicant.get("heightIn", 0)
weight = applicant.get("weight", 0)
if not all([height_ft, height_in, weight]):
return 0.0
# Convert to total inches
total_inches = (height_ft * 12) + height_in
# Convert to meters
height_m = total_inches * 0.0254
# Convert weight to kg
weight_kg = weight * 0.453592
# Calculate BMI
if height_m > 0:
return weight_kg / (height_m * height_m)
return 0.0
def _validate_application(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Perform comprehensive IHA underwriting validation"""
validation_result = {
"is_eligible": True,
"issues": [],
"warnings": [],
"applicant_validations": []
}
applicants = request_data.get("applicants", [])
phq = request_data.get("phq", {})
# Check for uninsurable conditions
condition_issues = self._check_uninsurable_conditions(phq)
validation_result["issues"].extend(condition_issues)
# Check for uninsurable medications
medication_issues = self._check_uninsurable_medications(phq)
validation_result["issues"].extend(medication_issues)
# Check diabetes risk factors
diabetes_issues = self._check_diabetes_risk_factors(phq)
validation_result["issues"].extend(diabetes_issues)
# Validate each applicant
for i, applicant in enumerate(applicants):
applicant_validation = {
"applicant_index": i,
"name": f"{applicant.get('firstName', '')} {applicant.get('lastName', '')}",
"is_eligible": True,
"issues": [],
"warnings": [],
"bmi": 0.0,
"premium_category": "unknown"
}
# Calculate BMI
bmi = self._calculate_bmi(applicant)
applicant_validation["bmi"] = bmi
# Check height/weight eligibility
premium_category, message = self._check_height_weight_eligibility(applicant)
applicant_validation["premium_category"] = premium_category
if premium_category == "uninsurable":
applicant_validation["is_eligible"] = False
applicant_validation["issues"].append(f"Height/weight: {message}")
elif premium_category == "underweight":
applicant_validation["warnings"].append(f"Height/weight: {message}")
elif premium_category == "high":
applicant_validation["warnings"].append(f"Height/weight: {message}")
# Check for nicotine use
if applicant.get("nicotine"):
applicant_validation["warnings"].append("Nicotine use detected - may affect premium")
# Check age
try:
dob = applicant.get("dob", "")
if dob:
# Parse date of birth (assuming format like "dd/mm/yyyy")
if "/" in dob:
day, month, year = dob.split("/")
birth_date = datetime(int(year), int(month), int(day))
age = (datetime.now() - birth_date).days // 365
if age > 80:
applicant_validation["warnings"].append(f"Advanced age ({age} years) may affect eligibility")
elif age < 18:
applicant_validation["is_eligible"] = False
applicant_validation["issues"].append("Applicant must be 18 or older")
except:
applicant_validation["warnings"].append("Unable to determine age from date of birth")
validation_result["applicant_validations"].append(applicant_validation)
# If any applicant is ineligible, mark overall as ineligible
if not applicant_validation["is_eligible"]:
validation_result["is_eligible"] = False
# If there are any critical issues, mark as ineligible
if validation_result["issues"]:
validation_result["is_eligible"] = False
return validation_result
async def get_client(self) -> httpx.AsyncClient:
"""Get HTTP client for TALESTORM API"""
@ -26,8 +352,28 @@ class EstimationService:
headers=headers
)
def _format_estimation_request(self, request_data: Dict[str, Any]) -> str:
"""Format the estimation request as a natural language message for the AI"""
async def get_plan_details(self, plan_id: str) -> Optional[Dict[str, Any]]:
"""Fetch plan details from the insurance API"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.insurance_api_base_url}/insurance/plans/{plan_id}",
headers={"accept": "application/json"}
)
if response.status_code == 200:
return response.json()
else:
print(f"Failed to fetch plan {plan_id}: {response.status_code} {response.text}")
return None
except Exception as e:
print(f"Error fetching plan {plan_id}: {str(e)}")
return None
async def _format_estimation_request_with_plan_details(self, request_data: Dict[str, Any]) -> str:
"""Format the estimation request with fetched plan details"""
applicants = request_data.get("applicants", [])
plans = request_data.get("plans", [])
phq = request_data.get("phq", {})
@ -48,10 +394,30 @@ class EstimationService:
message_parts.append(f" Height: {applicant.get('heightFt', '')}'{applicant.get('heightIn', '')}\"")
message_parts.append(f" Nicotine use: {'Yes' if applicant.get('nicotine') else 'No'}")
# Add plan information
# Fetch and add detailed plan information for all plans
if plans:
plan = plans[0]
message_parts.append(f"\nPlan: Coverage type {plan.get('coverage', '')}")
message_parts.append("\nPlan Details:")
for i, plan in enumerate(plans):
plan_id = plan.get('id')
if plan_id:
plan_details = await self.get_plan_details(plan_id)
if plan_details:
message_parts.append(f"\nPlan {i+1}:")
message_parts.append(f"- Plan Name: {plan_details.get('name', 'N/A')}")
message_parts.append(f"- Coverage Type: {plan_details.get('coverage_type', 'N/A')}")
message_parts.append(f"- Deductible: ${plan_details.get('deductible', 0):,.2f}")
message_parts.append(f"- Premium: ${plan_details.get('premium', 0):,.2f}")
message_parts.append(f"- Coinsurance: {plan_details.get('coinsurance', 0)}%")
message_parts.append(f"- Copay: ${plan_details.get('copay', 0):,.2f}")
if plan_details.get('benefits'):
message_parts.append("- Benefits:")
for benefit in plan_details['benefits']:
message_parts.append(f" * {benefit.get('name', '')}: {benefit.get('description', '')}")
else:
# Fallback to basic plan info if API call fails
message_parts.append(f"\nPlan {i+1}: Coverage type {plan.get('coverage', '')}")
else:
message_parts.append(f"\nPlan {i+1}: Coverage type {plan.get('coverage', '')}")
# Add PHQ information
if phq:
@ -123,11 +489,28 @@ class EstimationService:
async def estimate_insurance(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Send estimation request to TALESTORM API and parse response"""
try:
# Perform IHA underwriting validation first
validation_result = self._validate_application(request_data)
# If application is not eligible, return rejection immediately
if not validation_result["is_eligible"]:
return {
"status": "rejected",
"details": {
"dtq": False,
"reason": f"Application not eligible: {'; '.join(validation_result['issues'])}",
"tier": 4,
"total_price": 0.0
},
"results": [],
"validation": validation_result
}
# Create or get existing session with estimation agent
session_id = await session_service.create_session(agent_id=self.agent_id)
# Format the request as a natural language message
estimation_message = self._format_estimation_request(request_data)
# Format the request as a natural language message with plan details
estimation_message = await self._format_estimation_request_with_plan_details(request_data)
# Send request to TALESTORM API
async with await self.get_client() as client:
@ -194,6 +577,15 @@ class EstimationService:
if field not in result:
raise ValueError(f"Missing required result field: {field}")
# Add validation information to the response
parsed_response["validation"] = validation_result
# If there are warnings, add them to the response
if validation_result["warnings"]:
if "warnings" not in parsed_response:
parsed_response["warnings"] = []
parsed_response["warnings"].extend(validation_result["warnings"])
return parsed_response
except (json.JSONDecodeError, ValueError) as e:
@ -208,7 +600,8 @@ class EstimationService:
"tier": 4,
"total_price": 0.0
},
"results": []
"results": [],
"validation": validation_result
}
except Exception as e:
@ -221,7 +614,8 @@ class EstimationService:
"tier": 4,
"total_price": 0.0
},
"results": []
"results": [],
"validation": validation_result if 'validation_result' in locals() else None
}
# Global estimation service instance

View file

@ -1,7 +1,7 @@
import httpx
import uuid
from typing import Optional, Dict, Any, List
from ..config import settings
from src.config import settings
class SessionService:
"""Service for managing chat sessions with talestorm-ai"""