virgil-ai/src/cache/drug_cache.py

139 lines
4 KiB
Python

import json
import re
import httpx
from pydantic import BaseModel
from sqlalchemy.exc import IntegrityError
from src.config import settings
from src.database import Drug, Session
from src.drug_price_parser import DrugPriceParser, DrugPriceResponse
from src.services.session_service import session_service
class DrugFull(BaseModel):
name: str
dosage: float
dosage_unit: str
unit_price: float
description: str | None = None
def make_drug_full(drug: Drug):
return DrugFull(
name=drug.name,
dosage=drug.dosage,
dosage_unit=drug.dosage_unit,
unit_price=drug.unit_price,
description=drug.description,
)
async def convert_drug_result(drug: DrugPriceResponse) -> list[DrugFull]:
base_url = settings.TALESTORM_API_BASE_URL
api_key = settings.TALESTORM_API_KEY
client = httpx.AsyncClient(
base_url=base_url,
headers={"X-API-Key": api_key},
timeout=httpx.Timeout(60.0)
)
session_id = await session_service.create_session(agent_id=settings.TALESTORM_DRUG_AGENT_ID)
drug_json = drug.model_dump_json()
response = await client.post(
"/chat/",
json={"chat_session_id": session_id, "user_message": f"{drug_json}"},
)
response_json = response.json()['message']
response_dict = json.loads(response_json)
return [DrugFull.model_validate(r) for r in response_dict["result"]]
async def download_drug(drug_name: str):
with Session() as session:
drugs = session.query(Drug).filter(Drug.name == drug_name).all()
if drugs:
return
parser = DrugPriceParser()
result = parser.get_drug_prices(drug_name)
drugs = await convert_drug_result(result)
await store_drug(drugs)
async def get_drug(drug_name: str) -> list[DrugFull]:
parser = DrugPriceParser()
result = parser.get_drug_prices(drug_name)
drugs = await convert_drug_result(result)
return drugs
async def store_drug(drugs: list[DrugFull]):
with Session() as session:
for drug in drugs:
try:
session.add(Drug(
name=drug.name,
dosage=drug.dosage,
dosage_unit=drug.dosage_unit,
unit_price=drug.unit_price,
description=drug.description
))
session.commit()
except IntegrityError as e:
session.rollback()
pass
def parse_dosage_unit(dosage_unit: str):
m = re.match(r"^([a-zA-Z]+)\s*/\s*(\d*\.?\d*)\s*([a-zA-Z]+)$", dosage_unit, re.IGNORECASE)
if m:
x1 = m.group(1)
y = float(m.group(2))
x2 = m.group(3)
return x1, y, x2
return None, 1, None
def check_drug_dosage(d: DrugFull, dosage: float) -> DrugFull | None:
if d.dosage_unit == "mg" and dosage == d.dosage:
return d
if (d.dosage_unit == "mcg" and dosage * 1000 == d.dosage) or (d.dosage_unit == "g" and dosage / 1000 == d.dosage):
return d
x1, y, x2 = parse_dosage_unit(d.dosage_unit)
if x1 is not None and ((x1 == "mg" and dosage == d.dosage) or (x1 == "mcg" and dosage * 1000 == d.dosage) or (x1 == "g" and dosage / 1000 == d.dosage)):
d.unit_price *= y
return d
return None
async def fetch_drug_with_dosage(drug_name: str, dosage: float) -> DrugFull | None:
try:
with Session() as session:
drugs = session.query(Drug).filter(Drug.name == drug_name).all()
for d in [make_drug_full(df) for df in drugs]:
drug_full = check_drug_dosage(d, dosage)
if drug_full is not None:
return drug_full
except Exception as e:
pass
drugs = await get_drug(drug_name)
try:
await store_drug(drugs)
except Exception as e:
print(f"Error storing drug {drug_name}: {e}")
pass
for c_drug in drugs:
drug_full = check_drug_dosage(c_drug, dosage)
if drug_full:
return drug_full
return None