initial commit

This commit is contained in:
ipu 2025-07-25 14:01:16 +03:00
commit aaba8753ef
36 changed files with 3682 additions and 0 deletions

1
src/__init__.py Normal file
View file

@ -0,0 +1 @@
# Lolly AI - Insurance AI Assistant

5
src/api/__init__.py Normal file
View file

@ -0,0 +1,5 @@
from fastapi import APIRouter
from .v1.router import router as v1_router
api_router = APIRouter()
api_router.include_router(v1_router)

5
src/api/v1/__init__.py Normal file
View file

@ -0,0 +1,5 @@
from fastapi import APIRouter
from .router import router
api_router = APIRouter()
api_router.include_router(router)

56
src/api/v1/models.py Normal file
View file

@ -0,0 +1,56 @@
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
class InsuranceChatRequest(BaseModel):
message: str = Field(..., description="User message")
session_id: Optional[str] = Field(None, description="Chat session ID")
class Source(BaseModel):
plan_name: str
chunk_number: int
content_chunk: str
class HistoryItem(BaseModel):
role: str
message: str
class InsuranceChatResponse(BaseModel):
session_id: str
answer: str
sources: List[Source] = []
history: List[HistoryItem] = []
class EstimationRequest(BaseModel):
uid: str = Field(..., description="Application unique identifier")
applicants: List[Dict[str, Any]] = Field(..., description="List of applicants")
plans: List[Dict[str, Any]] = Field(..., description="List of insurance plans")
phq: Dict[str, Any] = Field(..., description="Personal Health Questionnaire data")
income: Optional[float] = Field(0, description="Annual income")
address: Optional[Dict[str, Any]] = Field({}, description="Address information")
class EstimationResponse(BaseModel):
uid: str
status: str
data: Dict[str, Any]
external: Optional[Dict[str, Any]] = None
class SessionCreateResponse(BaseModel):
session_id: str
class SessionResponse(BaseModel):
id: str
organization_id: Optional[str] = None
agent_id: Optional[str] = None
created_at: Optional[str] = None
class SessionsListResponse(BaseModel):
sessions: List[SessionResponse]
class AgentsListResponse(BaseModel):
agents: List[Dict[str, Any]]
class HealthResponse(BaseModel):
status: str
service: str
version: str
dependencies: Dict[str, str]

104
src/api/v1/router.py Normal file
View file

@ -0,0 +1,104 @@
from fastapi import APIRouter, HTTPException
from typing import Dict, Any
import httpx
from . import models
from ...services.chat_service import chat_service
from ...services.estimation_service import run_underwriting
from ...config import settings
router = APIRouter(prefix="/v1", tags=["v1"])
@router.post("/insurance_chat", response_model=models.InsuranceChatResponse)
async def insurance_chat(request: models.InsuranceChatRequest):
"""Handle insurance chat requests"""
try:
result = await chat_service.process_insurance_chat(
message=request.message,
session_id=request.session_id
)
return models.InsuranceChatResponse(
session_id=result["session_id"],
answer=result["answer"],
sources=result["sources"],
history=result["history"]
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing chat request: {str(e)}")
@router.post("/estimation", response_model=models.EstimationResponse)
async def estimate(request: models.EstimationRequest):
"""Handle insurance estimation requests"""
try:
# Validate required fields
if not request.applicants or not request.plans:
raise HTTPException(
status_code=400,
detail="Missing required applicants or plans"
)
# Step 1: Run estimation
underwriting_result = run_underwriting(request.applicants, request.phq, request.plans)
# Step 2: If DTQ → reject application
if underwriting_result["combined"].get("dtq"):
reject_response = await reject_application(request.uid)
return models.EstimationResponse(
uid=request.uid,
status="rejected",
data=underwriting_result,
external=reject_response
)
# Step 3: Else → assign tier and submit
final_tier = underwriting_result["combined"]["tier"]
plans = request.plans.copy()
if plans:
plans[0]["tier"] = f"tier_{str(final_tier).replace('.', '_')}"
# Assemble external payload
submission_payload = {
"applicants": request.applicants,
"plans": plans,
"phq": request.phq,
"income": request.income,
"address": request.address
}
submit_response = await submit_application(submission_payload)
return models.EstimationResponse(
uid=request.uid,
status="submitted",
data=underwriting_result,
external=submit_response
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Server error: {str(e)}"
)
async def reject_application(uid: str) -> Dict[str, Any]:
"""Reject application via external API"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{settings.INSURANCE_API_BASE_URL}/applications/reject",
json={"applicationId": uid}
)
return response.json() if response.status_code == 200 else {"error": "Failed to reject application"}
async def submit_application(application_payload: Dict[str, Any]) -> Dict[str, Any]:
"""Submit application via external API"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{settings.INSURANCE_API_BASE_URL}/applications/submit",
json=application_payload
)
return response.json() if response.status_code == 200 else {"error": "Failed to submit application"}

19
src/config.py Normal file
View file

@ -0,0 +1,19 @@
from pathlib import Path
from typing import Optional
from pydantic_settings import BaseSettings, SettingsConfigDict
env_path = (Path(__file__).parent.parent / '.env').absolute()
class Settings(BaseSettings):
model_config = SettingsConfigDict(extra='ignore', env_file=env_path, env_file_encoding='utf-8')
INSURANCE_API_BASE_URL: str
TALESTORM_API_BASE_URL: str = ""
TALESTORM_API_KEY: str
TALESTORM_AGENT_ID: str
# Global settings instance
settings = Settings(_env_file=env_path, _env_file_encoding='utf-8')

1
src/examples/__init__.py Normal file
View file

@ -0,0 +1 @@
# Examples package

View file

@ -0,0 +1,178 @@
#!/usr/bin/env python3
"""
Example usage of the Lolly AI API
"""
import httpx
import asyncio
from typing import Dict, Any
from src.config import settings
# Configuration
API_BASE_URL = "http://localhost:7310"
async def test_session_creation():
"""Test session creation with configured agent"""
print("Testing Session Creation...")
print(f"Using configured agent ID: {settings.TALESTORM_AGENT_ID}")
async with httpx.AsyncClient() as client:
try:
response = await client.post(f"{API_BASE_URL}/api/v1/sessions")
if response.status_code == 200:
data = response.json()
session_id = data.get("session_id")
print(f"✅ Session created: {session_id}")
return session_id
else:
print(f"❌ Failed to create session: {response.status_code}")
return None
except Exception as e:
print(f"❌ Error creating session: {e}")
return None
async def test_insurance_chat():
"""Test the insurance chat endpoint with new format"""
print("Testing Insurance Chat...")
async with httpx.AsyncClient() as client:
# First create a session
session_id = await test_session_creation()
payload = {
"message": "What does health insurance typically cover?",
"session_id": session_id
}
try:
response = await client.post(f"{API_BASE_URL}/api/v1/insurance_chat", json=payload)
if response.status_code == 200:
data = response.json()
print(f"✅ Chat successful!")
print(f"Session ID: {data['session_id']}")
print(f"Answer: {data['answer']}")
# Show sources if any
if data['sources']:
print(f"Sources found: {len(data['sources'])}")
for i, source in enumerate(data['sources']):
print(f" Source {i+1}:")
print(f" Plan: {source['plan_name']}")
print(f" Chunk: {source['chunk_number']}")
print(f" Content: {source['content_chunk'][:100]}...")
else:
print("No sources found (RAG not implemented yet)")
# Show history in detail
print(f"\nHistory entries: {len(data['history'])}")
for i, history_item in enumerate(data['history']):
role_emoji = "👤" if history_item['role'] == "user" else "🤖"
print(f" {role_emoji} {history_item['role'].upper()}: {history_item['message'][:80]}...")
# Show full message if it's short
if len(history_item['message']) <= 80:
print(f" Full message: {history_item['message']}")
else:
print(f"❌ Chat failed with status {response.status_code}")
print(f"Error: {response.text}")
except Exception as e:
print(f"❌ Error testing chat: {e}")
async def test_multiple_messages():
"""Test sending multiple messages to the same session"""
print("\nTesting Multiple Messages...")
async with httpx.AsyncClient() as client:
# Create a session
session_id = await test_session_creation()
if not session_id:
print("❌ Could not create session for multiple message test")
return
# Send multiple messages
messages = [
"What is health insurance?",
"What types of coverage are available?",
"How much does insurance typically cost?"
]
for i, message in enumerate(messages, 1):
print(f"\nMessage {i}: {message}")
payload = {
"message": message,
"session_id": session_id
}
try:
response = await client.post(f"{API_BASE_URL}/api/v1/insurance_chat", json=payload)
if response.status_code == 200:
data = response.json()
print(f" ✅ Response: {data['answer'][:60]}...")
print(f" 📝 History entries: {len(data['history'])}")
# Show the last few history items
recent_history = data['history'][-4:] # Show last 4 items
for item in recent_history:
role_emoji = "👤" if item['role'] == "user" else "🤖"
print(f" {role_emoji} {item['role']}: {item['message'][:40]}...")
else:
print(f" ❌ Failed: {response.status_code}")
except Exception as e:
print(f" ❌ Error: {e}")
async def test_estimation():
"""Test the estimation endpoint"""
print("Testing Insurance Estimation...")
payload = {
"uid": "test-application-123",
"applicants": [
{
"firstName": "John",
"dob": "15/03/1985",
"weight": 70,
"heightFt": 5,
"heightIn": 10,
"applicant": 1
}
],
"plans": [{"coverage": 1}],
"phq": {
"effectiveDate": "01/01/2024",
"medications": [],
"conditions": [],
"issues": []
},
"income": 50000,
"address": {}
}
async with httpx.AsyncClient() as client:
try:
response = await client.post(f"{API_BASE_URL}/api/v1/estimation", json=payload)
if response.status_code == 200:
data = response.json()
print(f"✅ Estimation successful!")
print(f"Application ID: {data['uid']}")
print(f"Status: {data['status']}")
else:
print(f"❌ Estimation failed with status {response.status_code}")
print(f"Error: {response.text}")
except Exception as e:
print(f"❌ Error testing estimation: {e}")
async def main():
"""Run all tests"""
print("🚀 Starting Lolly AI API Tests")
print("=" * 50)
await test_insurance_chat()
print("\n" + "=" * 50)
await test_multiple_messages()
print("\n" + "=" * 50)
await test_estimation()
print("\n✅ All tests completed!")
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,149 @@
#!/usr/bin/env python3
"""
Example of session management with Lolly AI
"""
import httpx
import asyncio
from typing import Dict, Any
# Configuration
API_BASE_URL = "http://localhost:7310"
async def demonstrate_session_management():
"""Demonstrate session management functionality"""
print("🔄 Session Management Demo")
print("=" * 50)
async with httpx.AsyncClient() as client:
try:
# Step 1: List available agents
print("1. Listing available agents...")
response = await client.get(f"{API_BASE_URL}/api/v1/agents")
if response.status_code == 200:
agents = response.json()["agents"]
print(f" ✅ Found {len(agents)} agents")
for agent in agents:
print(f" - {agent.get('name', 'Unknown')} ({agent.get('model_name', 'Unknown')})")
else:
print(f" ❌ Failed to list agents: {response.status_code}")
# Step 2: Create a new session
print("\n2. Creating a new session...")
response = await client.post(f"{API_BASE_URL}/api/v1/sessions")
if response.status_code == 200:
session_data = response.json()
session_id = session_data["session_id"]
print(f" ✅ Session created: {session_id}")
else:
print(f" ❌ Failed to create session: {response.status_code}")
return
# Step 3: Get session details
print(f"\n3. Getting session details for {session_id}...")
response = await client.get(f"{API_BASE_URL}/api/v1/sessions/{session_id}")
if response.status_code == 200:
session_details = response.json()
print(f" ✅ Session details retrieved")
print(f" - Agent ID: {session_details.get('agent_id', 'Unknown')}")
print(f" - Created: {session_details.get('created_at', 'Unknown')}")
else:
print(f" ❌ Failed to get session details: {response.status_code}")
# Step 4: Send a chat message using the session
print(f"\n4. Sending chat message using session {session_id}...")
chat_payload = {
"message": "Hello! I have a question about health insurance coverage.",
"session_id": session_id
}
response = await client.post(f"{API_BASE_URL}/api/v1/insurance_chat", json=chat_payload)
if response.status_code == 200:
chat_response = response.json()
print(f" ✅ Chat message sent successfully")
print(f" - Response: {chat_response['answer'][:100]}...")
else:
print(f" ❌ Failed to send chat message: {response.status_code}")
# Step 5: List all sessions
print(f"\n5. Listing all sessions...")
response = await client.get(f"{API_BASE_URL}/api/v1/sessions")
if response.status_code == 200:
sessions = response.json()["sessions"]
print(f" ✅ Found {len(sessions)} sessions")
for session in sessions:
print(f" - Session {session.get('id', 'Unknown')} (Agent: {session.get('agent_id', 'Unknown')})")
else:
print(f" ❌ Failed to list sessions: {response.status_code}")
# Step 6: Send another message to the same session
print(f"\n6. Sending follow-up message to session {session_id}...")
follow_up_payload = {
"message": "Can you tell me more about dental coverage?",
"session_id": session_id
}
response = await client.post(f"{API_BASE_URL}/api/v1/insurance_chat", json=follow_up_payload)
if response.status_code == 200:
chat_response = response.json()
print(f" ✅ Follow-up message sent successfully")
print(f" - Response: {chat_response['answer'][:100]}...")
print(f" - History length: {len(chat_response['history'])}")
else:
print(f" ❌ Failed to send follow-up message: {response.status_code}")
except Exception as e:
print(f"❌ Error during session management demo: {e}")
async def demonstrate_session_persistence():
"""Demonstrate session persistence across multiple requests"""
print("\n🔄 Session Persistence Demo")
print("=" * 50)
async with httpx.AsyncClient() as client:
try:
# Create a session
response = await client.post(f"{API_BASE_URL}/api/v1/sessions")
if response.status_code != 200:
print("❌ Failed to create session for persistence demo")
return
session_id = response.json()["session_id"]
print(f"Created session: {session_id}")
# Send multiple messages to the same session
messages = [
"What is health insurance?",
"What types of coverage are available?",
"How much does insurance typically cost?",
"What is a deductible?"
]
for i, message in enumerate(messages, 1):
print(f"\nMessage {i}: {message}")
payload = {
"message": message,
"session_id": session_id
}
response = await client.post(f"{API_BASE_URL}/api/v1/insurance_chat", json=payload)
if response.status_code == 200:
chat_response = response.json()
print(f" Response: {chat_response['answer'][:80]}...")
print(f" History entries: {len(chat_response['history'])}")
else:
print(f" ❌ Failed to send message: {response.status_code}")
print(f"\n✅ Session persistence demo completed for session: {session_id}")
except Exception as e:
print(f"❌ Error during session persistence demo: {e}")
async def main():
"""Run session management demonstrations"""
await demonstrate_session_management()
await demonstrate_session_persistence()
print("\n" + "=" * 50)
print("✅ Session management demonstrations completed!")
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,80 @@
#!/usr/bin/env python3
"""
Test script to verify TALESTORM_AGENT_ID usage
"""
import asyncio
from src.services.session_service import session_service
from src.config import settings
async def test_agent_id_usage():
"""Test that the configured agent ID is being used"""
print("Testing TALESTORM_AGENT_ID Usage...")
print("=" * 50)
print(f"Configured Agent ID: {settings.TALESTORM_AGENT_ID}")
print(f"Session Service Agent ID: {session_service.agent_id}")
# Test session creation
print("\nCreating session with configured agent...")
session_id = await session_service.create_session()
if session_id:
print(f"✅ Session created: {session_id}")
# Get session details to verify agent
session_details = await session_service.get_session(session_id)
if session_details:
print(f"Session Agent ID: {session_details.get('agent_id')}")
print(f"Session Organization ID: {session_details.get('organization_id')}")
else:
print("⚠️ Could not retrieve session details")
else:
print("❌ Failed to create session")
# Test listing agents
print("\nListing available agents...")
agents = await session_service.list_agents()
if agents:
print(f"✅ Found {len(agents)} agents:")
for i, agent in enumerate(agents, 1):
print(f" {i}. {agent.get('name', 'Unknown')} (ID: {agent.get('id')})")
# Highlight the configured agent
if str(agent.get('id')) == settings.TALESTORM_AGENT_ID:
print(f" ⭐ This is the configured agent!")
else:
print("❌ No agents found")
print("\n✅ Agent ID test completed!")
async def test_agent_validation():
"""Test that the configured agent exists and is accessible"""
print("\nTesting Agent Validation...")
print("=" * 50)
if not settings.TALESTORM_AGENT_ID:
print("❌ TALESTORM_AGENT_ID not configured")
return
# Try to get the configured agent
agent = await session_service.get_default_agent()
if agent:
print(f"✅ Configured agent found:")
print(f" Name: {agent.get('name', 'Unknown')}")
print(f" ID: {agent.get('id')}")
print(f" Model: {agent.get('model_name', 'Unknown')}")
print(f" Provider: {agent.get('model_provider', 'Unknown')}")
print(f" RAG Enabled: {agent.get('enable_rag_search', False)}")
else:
print("❌ Configured agent not found or not accessible")
print("\n✅ Agent validation completed!")
async def main():
"""Run all tests"""
await test_agent_id_usage()
await test_agent_validation()
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,124 @@
#!/usr/bin/env python3
"""
Test script to verify chat history parsing with actual talestorm-ai format
"""
from src.services.chat_service import ChatService
from src.api.v1.models import HistoryItem
# Sample data from talestorm-ai
sample_history = [
{
"id": "45d3e5f0-21cf-47b3-8c24-0d273107ab7f",
"chat_session_id": "f250bb60-d619-4802-914f-0594d2a52bfa",
"content": [
{
"parts": [
{
"content": "hi",
"timestamp": "2025-07-25T09:49:29.412466Z",
"part_kind": "user-prompt"
}
],
"instructions": "You are an expert insurance assistant. You may only answer questions about insurance plans, coverage, benefits, and related terms. If asked anything outside that domain, reply: 'I'm sorry, I can only answer insurance-related questions.'",
"kind": "request"
},
{
"parts": [
{
"content": "Hello! How can I assist you with your insurance questions today?",
"part_kind": "text"
}
],
"usage": {
"requests": 1,
"request_tokens": 137,
"response_tokens": 14,
"total_tokens": 151,
"details": {
"accepted_prediction_tokens": 0,
"audio_tokens": 0,
"reasoning_tokens": 0,
"rejected_prediction_tokens": 0,
"cached_tokens": 0
}
},
"model_name": "gpt-4.1-mini-2025-04-14",
"timestamp": "2025-07-25T09:49:30Z",
"kind": "response",
"vendor_details": None,
"vendor_id": "chatcmpl-Bx9LOZPgBbHOblSUd6LcuV3qIYEwz"
}
],
"created_at": "2025-07-25T09:40:20.797430"
}
]
def parse_history_manually(messages):
"""Manually parse the history to test our logic"""
history = []
for message in messages:
content = message.get("content", [])
for item in content:
if not isinstance(item, dict):
continue
kind = item.get("kind", "")
# Handle request messages (user input)
if kind == "request":
parts = item.get("parts", [])
for part in parts:
if part.get("part_kind") == "user-prompt":
history.append(HistoryItem(
role="user",
message=part.get("content", "")
))
# Handle response messages (assistant output)
elif kind == "response":
parts = item.get("parts", [])
for part in parts:
if part.get("part_kind") == "text":
history.append(HistoryItem(
role="assistant",
message=part.get("content", "")
))
return history
def test_history_parsing():
"""Test the history parsing logic"""
print("Testing History Parsing...")
print("=" * 50)
# Parse the sample history
history = parse_history_manually(sample_history)
print(f"Found {len(history)} history items:")
for i, item in enumerate(history, 1):
print(f" {i}. {item.role}: {item.message}")
# Verify the expected results
expected_user_message = "hi"
expected_assistant_message = "Hello! How can I assist you with your insurance questions today?"
if len(history) == 2:
if history[0].role == "user" and history[0].message == expected_user_message:
print("✅ User message parsed correctly")
else:
print("❌ User message parsing failed")
if history[1].role == "assistant" and history[1].message == expected_assistant_message:
print("✅ Assistant message parsed correctly")
else:
print("❌ Assistant message parsing failed")
else:
print(f"❌ Expected 2 history items, got {len(history)}")
print("\n✅ History parsing test completed!")
if __name__ == "__main__":
test_history_parsing()

20
src/main.py Normal file
View file

@ -0,0 +1,20 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from src.api import api_router as v1_router
app = FastAPI()
app.include_router(v1_router, prefix="/api")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
return {"status": "ok"}

1
src/services/__init__.py Normal file
View file

@ -0,0 +1 @@
# Services package

View file

@ -0,0 +1,137 @@
import httpx
from typing import Dict, Any, List, Optional
from ..config import settings
from .session_service import session_service
from ..api.v1.models import Source, HistoryItem
class ChatService:
"""Service for handling chat functionality with talestorm-ai"""
def __init__(self):
self.base_url = settings.TALESTORM_API_BASE_URL
self.api_key = settings.TALESTORM_API_KEY
async def get_client(self) -> httpx.AsyncClient:
"""Get HTTP client for talestorm-ai API"""
headers = {}
if self.api_key:
headers["X-API-Key"] = self.api_key
return httpx.AsyncClient(
headers=headers
)
async def send_message(self, session_id: str, message: str) -> Dict[str, Any]:
"""Send a message to talestorm-ai and get response"""
async with await self.get_client() as client:
try:
response = await client.post(self.base_url + "/chat/", json={
"chat_session_id": session_id,
"user_message": message
})
if response.status_code == 200:
return response.json()
else:
print(response.url)
print(response.status_code)
print(response.json())
raise Exception(f"Error sending message to talestorm-ai: {response.status_code} {response.json()}")
except Exception as e:
return {
"chat_session_id": session_id,
"message": f"I'm sorry, I'm experiencing technical difficulties. Please try again later. Error: {str(e)}"
}
async def get_chat_history(self, session_id: str) -> List[HistoryItem]:
"""Get chat history for a session and format it properly"""
async with await self.get_client() as client:
try:
response = await client.get(self.base_url + "/chat/", params={"chat_session_id": session_id})
if response.status_code == 200:
messages = response.json()
history = []
for message in messages:
content = message.get("content", [])
for item in content:
if not isinstance(item, dict):
continue
kind = item.get("kind", "")
# Handle request messages (user input)
if kind == "request":
parts = item.get("parts", [])
for part in parts:
if part.get("part_kind") == "user-prompt":
history.append(HistoryItem(
role="user",
message=part.get("content", "")
))
# Handle response messages (assistant output)
elif kind == "response":
parts = item.get("parts", [])
for part in parts:
if part.get("part_kind") == "text":
history.append(HistoryItem(
role="assistant",
message=part.get("content", "")
))
return history
return []
except Exception as e:
print(f"Error getting chat history: {e}")
return []
def _extract_sources_from_response(self, response_text: str) -> List[Source]:
"""Extract sources from RAG search results if available"""
# This is a placeholder - in a real implementation, you would:
# 1. Check if the response contains RAG search results
# 2. Parse the results to extract plan names, chunk numbers, and content
# 3. Return properly formatted Source objects
# For now, return empty list - this would be populated when RAG is implemented
return []
async def process_insurance_chat(self, message: str, session_id: Optional[str] = None) -> Dict[str, Any]:
"""Process an insurance chat request"""
try:
# Create session if not provided
if not session_id:
session_id = await session_service.create_session()
# Validate session if provided
elif not await session_service.validate_session(session_id):
# Create new session if invalid
session_id = await session_service.create_session()
# Send message to talestorm-ai
chat_response = await self.send_message(session_id, message)
# Get chat history
history = await self.get_chat_history(session_id)
# Extract sources from the response (placeholder for RAG implementation)
sources = self._extract_sources_from_response(chat_response.get("message", ""))
return {
"session_id": session_id,
"answer": chat_response.get("message", "No response received"),
"sources": sources,
"history": history
}
except Exception as e:
return {
"session_id": session_id or "fallback-session",
"answer": f"I'm sorry, I'm experiencing technical difficulties. Please try again later. Error: {str(e)}",
"sources": [],
"history": []
}
# Global chat service instance
chat_service = ChatService()

View file

@ -0,0 +1,225 @@
from datetime import datetime
import json
from typing import List, Dict, Any, Optional
# ---------------- Constants ---------------- #
DECLINABLE_CONDITIONS = {
"aids", "organ_transplant", "multiple_sclerosis", "alzheimer_disease",
"heart_attack", "congestive_heart_failure", "bipolar", "seizures"
}
DECLINABLE_ISSUE_KEYS = {"heart_attack", "congestive_heart_failure", "bipolar", "seizures"}
HIGH_RISK_FLAGS = {"50_units", "treat_diabetes"}
PRICING_TABLE = {
1: [122, 243, 219, 365],
1.5: [127, 253, 228, 379],
2: [133, 265, 239, 397],
2.5: [139, 277, 249, 415],
3: [145, 290, 261, 435],
3.5: [151, 302, 272, 452],
4: [158, 315, 283, 472],
4.5: [164, 327, 295, 491],
5: [172, 343, 309, 514],
5.5: [182, 364, 327, 545],
6: [196, 390, 351, 585],
6.5: [209, 417, 376, 626],
7: [222, 444, 400, 666]
}
# Simplified drug price map (in a real implementation, you'd load this from a file)
DRUG_PRICE_MAP = {
"metformin": 15.0,
"insulin": 45.0,
"aspirin": 8.0,
"atorvastatin": 25.0,
"lisinopril": 12.0,
"amlodipine": 18.0,
"omeprazole": 22.0,
"simvastatin": 20.0,
"hydrochlorothiazide": 10.0,
"losartan": 30.0
}
# ---------------- Helper Functions ---------------- #
def calculate_age(dob_str: str, effective_date_str: str) -> Optional[int]:
"""Calculate age from date of birth and effective date"""
try:
dob = datetime.strptime(dob_str, "%d/%m/%Y")
ref = datetime.strptime(effective_date_str, "%d/%m/%Y")
return (ref - dob).days // 365
except (ValueError, TypeError):
return None
def calculate_bmi(weight: float, ft: int, inch: int) -> Optional[float]:
"""Calculate BMI from weight and height"""
try:
inches = ft * 12 + inch
return (weight / (inches ** 2)) * 703
except (ValueError, TypeError, ZeroDivisionError):
return None
def check_declinable(phq: Dict[str, Any]) -> bool:
"""Check if application should be declined based on PHQ data"""
if phq.get("treatment") or phq.get("invalid"):
return True
for cond in phq.get("conditions", []):
if cond.get("key") in DECLINABLE_CONDITIONS:
return True
for issue in phq.get("issues", []):
for detail in issue.get("details", []):
if detail.get("key") in DECLINABLE_ISSUE_KEYS:
return True
return False
def base_tier(age: Optional[int]) -> float:
"""Calculate base tier based on age"""
if age is None:
return 4.0
if age <= 30:
return 1.0
elif age <= 40:
return 2.0
elif age <= 50:
return 3.0
elif age <= 60:
return 3.5
else:
return 4.0
def adjust_tier(tier: float, bmi: Optional[float], meds: List[Dict[str, Any]],
issues: List[Dict[str, Any]], applicant_index: int) -> float:
"""Adjust tier based on health factors"""
if bmi is not None and (bmi < 16 or bmi > 40):
tier += 1.0
personal_meds = [m for m in meds if m.get("applicant") == applicant_index]
if len(personal_meds) >= 3:
tier += 0.5
for issue in issues:
for detail in issue.get("details", []):
if detail.get("key") in HIGH_RISK_FLAGS:
tier += 0.5
return round(min(tier, 7.0) * 2) / 2
def calculate_rx_spend(medications: List[Dict[str, Any]], applicant_index: int) -> float:
"""Calculate total prescription spend for an applicant"""
total = 0.0
for med in medications:
if med.get("applicant") != applicant_index:
continue
drug = med.get("name", "").lower().strip()
if drug in DRUG_PRICE_MAP:
total += DRUG_PRICE_MAP[drug]
return round(total, 2)
def get_rx_price(tier: float, coverage_type: int) -> float:
"""Get prescription price based on tier and coverage type"""
tier = round(tier * 2) / 2
coverage_index = {
1: 0, 2: 1, 3: 2, 4: 3
}.get(coverage_type, 0)
return PRICING_TABLE.get(tier, PRICING_TABLE[5])[coverage_index]
# ---------------- Main Underwriting Logic ---------------- #
def run_underwriting(applicants: List[Dict[str, Any]], phq: Dict[str, Any],
plans: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Run comprehensive underwriting analysis"""
results = []
max_tier = 0
dtq_found = False
ages = []
# Calculate ages for all applicants
for applicant in applicants:
age = calculate_age(applicant.get("dob"), phq.get("effectiveDate"))
if age is not None:
ages.append(age)
oldest_age = max(ages) if ages else None
base = base_tier(oldest_age) if oldest_age else None
coverage = plans[0]["coverage"] if plans else 1
coverage_index = {1: 0, 2: 1, 3: 2, 4: 3}.get(coverage, 0)
for idx, applicant in enumerate(applicants):
try:
age = calculate_age(applicant.get("dob"), phq.get("effectiveDate"))
bmi = calculate_bmi(
applicant.get("weight"),
applicant.get("heightFt"),
applicant.get("heightIn")
)
name = applicant.get("firstName", "Unknown")
applicant_type = {
1: "Primary",
2: "Spouse",
3: "Dependent"
}.get(applicant.get("applicant"), "Unknown")
is_dtq = check_declinable(phq)
rx_spend = calculate_rx_spend(phq.get("medications", []), idx)
if is_dtq:
tier = 5.0
dtq_found = True
message = "Declined due to high-risk conditions (DTQ triggered)."
elif age is None or bmi is None or base is None:
tier = 4.5
message = "Assigned fallback Tier 4.5 due to missing age or BMI."
else:
tier = base
tier = adjust_tier(tier, bmi, phq.get("medications", []),
phq.get("issues", []), idx)
tier_price = PRICING_TABLE.get(tier, PRICING_TABLE[5])[coverage_index]
max_price = PRICING_TABLE[7][coverage_index]
if rx_spend > max_price:
tier = 5.0
message = f"Declined due to high Rx spend (${rx_spend} > ${max_price})"
elif rx_spend > tier_price:
for t in sorted(PRICING_TABLE.keys()):
if PRICING_TABLE[t][coverage_index] >= rx_spend:
tier = t
break
message = f"Rx spend ${rx_spend} increased tier to {tier}."
else:
message = f"Tier {tier} assigned with Rx spend ${rx_spend} within allowed limits."
except Exception as e:
tier = 4.5
rx_spend = 0.0
message = f"Fallback Tier 4.5 due to system error: {str(e)}"
max_tier = max(max_tier, tier)
results.append({
"name": name,
"applicant_type": applicant_type,
"age": age,
"bmi": round(bmi, 2) if bmi else None,
"tier": tier,
"rx_spend": rx_spend,
"message": message
})
total_price = get_rx_price(max_tier, coverage)
return {
"results": results,
"combined": {
"tier": max_tier,
"total_price": total_price,
"dtq": dtq_found,
"message": f"Final assigned tier is {max_tier} based on age, health, Rx cost, or fallback logic."
}
}

View file

@ -0,0 +1,111 @@
import httpx
import uuid
from typing import Optional, Dict, Any, List
from ..config import settings
class SessionService:
"""Service for managing chat sessions with talestorm-ai"""
def __init__(self):
self.base_url = settings.TALESTORM_API_BASE_URL
self.api_key = settings.TALESTORM_API_KEY
self.agent_id = settings.TALESTORM_AGENT_ID
async def get_client(self) -> httpx.AsyncClient:
"""Get HTTP client for talestorm-ai API"""
headers = {}
if self.api_key:
headers["X-API-Key"] = self.api_key
return httpx.AsyncClient(
base_url=self.base_url,
headers=headers
)
async def list_agents(self) -> List[Dict[str, Any]]:
"""List available agents from talestorm-ai"""
async with await self.get_client() as client:
try:
response = await client.get("/agents/")
if response.status_code == 200:
return response.json()
else:
return []
except Exception:
return []
async def get_default_agent(self) -> Optional[Dict[str, Any]]:
"""Get the configured agent or the first available agent"""
# First try to get the configured agent
if self.agent_id:
async with await self.get_client() as client:
try:
response = await client.get(f"/agents/{self.agent_id}")
if response.status_code == 200:
return response.json()
except Exception:
pass
# Fallback to first available agent
agents = await self.list_agents()
if agents:
return agents[0]
return None
async def create_session(self, agent_id: Optional[str] = None) -> Optional[str]:
"""Create a new chat session in talestorm-ai"""
async with await self.get_client() as client:
try:
# Use provided agent_id, then configured agent_id, then fallback to default
if not agent_id:
if self.agent_id:
agent_id = self.agent_id
else:
default_agent = await self.get_default_agent()
if not default_agent:
# Create a simple session ID for now
return str(uuid.uuid4())
agent_id = str(default_agent["id"])
# Create session with talestorm-ai
response = await client.post("/sessions/", params={"agent_id": agent_id})
if response.status_code == 200:
session_data = response.json()
return str(session_data["id"])
else:
# Fallback to local session ID
return str(uuid.uuid4())
except Exception:
# Fallback to local session ID
return str(uuid.uuid4())
async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""Get session details from talestorm-ai"""
async with await self.get_client() as client:
try:
response = await client.get(f"/sessions/{session_id}")
if response.status_code == 200:
return response.json()
return None
except Exception:
return None
async def list_sessions(self) -> List[Dict[str, Any]]:
"""List all sessions from talestorm-ai"""
async with await self.get_client() as client:
try:
response = await client.get("/sessions/")
if response.status_code == 200:
return response.json()
return []
except Exception:
return []
async def validate_session(self, session_id: str) -> bool:
"""Validate if a session exists and is accessible"""
session = await self.get_session(session_id)
return session is not None
# Global session service instance
session_service = SessionService()