virgil-ai/src/cache/redis_cache.py

131 lines
3.6 KiB
Python

import json
import httpx
import redis
import functools
from src.config import settings
from src.drug_price_parser import DrugPriceParser, DrugPriceResponse
# Initialize Redis client as a module-level singleton
_redis_client = None
def get_redis_client():
global _redis_client
if _redis_client is None:
_redis_client = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True
)
return _redis_client
def redis_cache(cache_key: str):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
redis_client = get_redis_client()
try:
cached_data = redis_client.get(cache_key)
if cached_data:
return json.loads(cached_data)
except Exception:
# If Redis is unavailable, just execute the function
pass
result = func(*args, **kwargs)
try:
result_json = json.dumps(result)
redis_client.setex(cache_key, settings.REDIS_CACHE_TTL, result_json)
except Exception:
pass
return result
return wrapper
return decorator
def fetch_virgil_api(endpoint: str):
url = settings.VIRGIL_API_BASE_URL + endpoint
with httpx.Client() as client:
response = client.get(url)
response.raise_for_status()
data = response.json()
return data
@redis_cache(cache_key="plans:all")
def fetch_plans():
return fetch_virgil_api("/insurances/plans/full")
@redis_cache(cache_key="conditions:all")
def fetch_conditions():
return fetch_virgil_api("/health/conditions")
@redis_cache(cache_key="issues:all")
def fetch_issues():
return fetch_virgil_api("/health/issues")
@redis_cache(cache_key="issue_details:all")
def fetch_issue_details():
return fetch_virgil_api("/health/issue-details")
def get_plan_by_id(plan_id: int):
plans = fetch_plans()
for plan in plans:
if plan["id"] == plan_id:
return plan
return None
def fetch_drug(drug_name: str) -> DrugPriceResponse:
cache_key = f"drugs:{drug_name}"
redis_client = get_redis_client()
try:
cached_data = redis_client.get(cache_key)
if cached_data:
return DrugPriceResponse.model_validate_json(cached_data)
except Exception:
pass
parser = DrugPriceParser()
result = parser.get_drug_prices(drug_name)
try:
result_json = result.model_dump_json()
redis_client.setex(cache_key, settings.REDIS_CACHE_TTL_DRUGS, result_json)
except Exception:
pass
return result
def search_drug(drug_name: str) -> str:
cache_key = f"drug_search:{drug_name}"
redis_client = get_redis_client()
try:
cached_data = redis_client.get(cache_key)
if cached_data:
return cached_data
except Exception:
pass
client = httpx.Client(
base_url="https://www.drugs.com/api/autocomplete",
# headers=headers,
timeout=httpx.Timeout(60.0, connect=10.0)
)
response = client.get(f"/?type=price-guide&s={drug_name}")
response_json = response.json()
result = response_json["categories"][0]["results"][0]["url"].replace("/price-guide/", "")
try:
redis_client.setex(cache_key, settings.REDIS_CACHE_TTL_DRUGS, result)
except Exception:
pass
return result