Spaces:
Running
Running
| # Import necessary libraries | |
| import os # Interacting with the operating system (reading/writing files) | |
| import chromadb # High-performance vector database for storing/querying dense vectors | |
| from dotenv import load_dotenv # Loading environment variables from a .env file | |
| import json # Parsing and handling JSON data | |
| # LangChain imports | |
| from langchain_core.documents import Document # Document data structures | |
| from langchain_core.runnables import RunnablePassthrough # LangChain core library for running pipelines | |
| from langchain_core.output_parsers import StrOutputParser # String output parser | |
| from langchain.prompts import ChatPromptTemplate # Template for chat prompts | |
| from langchain.chains.query_constructor.base import AttributeInfo # Base classes for query construction | |
| from langchain.retrievers.self_query.base import SelfQueryRetriever # Base classes for self-querying retrievers | |
| from langchain.retrievers.document_compressors import LLMChainExtractor, CrossEncoderReranker # Document compressors | |
| from langchain.retrievers import ContextualCompressionRetriever # Contextual compression retrievers | |
| # LangChain community & experimental imports | |
| from langchain_community.vectorstores import Chroma # Implementations of vector stores like Chroma | |
| from langchain_community.document_loaders import PyPDFDirectoryLoader, PyPDFLoader # Document loaders for PDFs | |
| from langchain_community.cross_encoders import HuggingFaceCrossEncoder # Cross-encoders from HuggingFace | |
| from langchain_experimental.text_splitter import SemanticChunker # Experimental text splitting methods | |
| from langchain.text_splitter import ( | |
| CharacterTextSplitter, # Splitting text by characters | |
| RecursiveCharacterTextSplitter # Recursive splitting of text by characters | |
| ) | |
| from langchain_core.tools import tool | |
| from langchain.agents import create_tool_calling_agent, AgentExecutor | |
| from langchain_core.prompts import ChatPromptTemplate | |
| # LangChain OpenAI imports | |
| from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI # OpenAI embeddings and models | |
| from langchain_community.embeddings import OpenAIEmbeddings | |
| # from langchain.embeddings.openai import OpenAIEmbeddings # OpenAI embeddings for text vectors | |
| # LlamaParse & LlamaIndex imports | |
| from llama_parse import LlamaParse # Document parsing library | |
| from llama_index.core import Settings, SimpleDirectoryReader # Core functionalities of the LlamaIndex | |
| # LangGraph import | |
| from langgraph.graph import StateGraph, END, START # State graph for managing states in LangChain | |
| # Pydantic import | |
| from pydantic import BaseModel # Pydantic for data validation | |
| # Typing imports | |
| from typing import Dict, List, Tuple, Any, TypedDict # Python typing for function annotations | |
| # Other utilities | |
| import numpy as np # Numpy for numerical operations | |
| from groq import Groq | |
| from mem0 import MemoryClient | |
| import streamlit as st | |
| from datetime import datetime | |
| #====================================SETUP=====================================# | |
| # Fetch secrets from Hugging Face Spaces | |
| api_key = os.environ['AZURE_OPENAI_KEY'] | |
| endpoint = os.environ['AZURE_OPENAI_ENDPOINT'] | |
| model_name = os.environ['CHATGPT_MODEL'] | |
| api_version = os.environ['AZURE_OPENAI_APIVERSION'] | |
| emb_key = os.environ['EMB_MODEL_KEY'] | |
| emb_endpoint = os.environ['EMB_DEPLOYMENT'] | |
| llamaparse_api_key = os.environ['LLAMA_KEY'] | |
| groq_api_key = os.environ['GROQ_API_KEY'] | |
| MEM0_api_key = os.environ['MEM0_API_KEY'] | |
| # Initialize the Llama Guard client with the API key | |
| llama_guard_client = Groq(api_key=groq_api_key) # Complete the code to provide the API key for the Llama Guard client | |
| # Initialize the OpenAI embedding function for Chroma | |
| embedding_function = chromadb.utils.embedding_functions.OpenAIEmbeddingFunction( | |
| api_base=endpoint, # Complete the code to define the API base endpoint | |
| api_key=api_key, # Complete the code to define the API key | |
| api_type='azure', # This is a fixed value and does not need modification | |
| api_version=api_version, # This is a fixed value and does not need modification | |
| model_name='text-embedding-ada-002' # This is a fixed value and does not need modification | |
| ) | |
| # This initializes the OpenAI embedding function for the Chroma vectorstore, using the provided Azure endpoint and API key. | |
| # Initialize the Azure OpenAI Embeddings | |
| embedding_model = AzureOpenAIEmbeddings( | |
| azure_endpoint=emb_endpoint, # Complete the code to define the Azure endpoint | |
| api_key=emb_key, # Complete the code to define the API key | |
| api_version='2023-05-15', # This is a fixed value and does not need modification | |
| model='text-embedding-ada-002' | |
| ) # Complete the code to define the model name | |
| # This initializes the Azure OpenAI embeddings model using the specified endpoint, API key, and model name. | |
| # Initialize the Azure Chat OpenAI model | |
| llm = AzureChatOpenAI( | |
| azure_endpoint=endpoint, # Complete the code to define the Azure endpoint | |
| api_key=api_key, # Complete the code to provide the API key | |
| api_version=api_version, # This is a fixed value and does not need modification | |
| azure_deployment=model_name, # Complete the code to define the Azure deployment name | |
| temperature=0 # Complete the code to set the temperature for response variability | |
| ) | |
| # This initializes the Azure Chat OpenAI model with the provided endpoint, API key, deployment name, and a temperature setting of 0 (to control response variability). | |
| # set the LLM and embedding model in the LlamaIndex settings. | |
| Settings.llm = llm # Complete the code to define the LLM model | |
| Settings.embedding = embedding_model # Complete the code to define the embedding model | |
| #================================Creating Langgraph agent======================# | |
| class AgentState(TypedDict): | |
| query: str # The current user query | |
| expanded_query: str # The expanded version of the user query | |
| context: List[Dict[str, Any]] # Retrieved documents (content and metadata) | |
| response: str # The generated response to the user query | |
| precision_score: float # The precision score of the response | |
| groundedness_score: float # The groundedness score of the response | |
| groundedness_loop_count: int # Counter for groundedness refinement loops | |
| precision_loop_count: int # Counter for precision refinement loops | |
| feedback: str | |
| query_feedback: str | |
| groundedness_check: bool | |
| loop_max_iter: int | |
| def expand_query(state): | |
| """ | |
| Expands the user query to improve retrieval of nutrition disorder-related information. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the user query. | |
| Returns: | |
| Dict: The updated state with the expanded query. | |
| """ | |
| system_message = """ | |
| You are a helpful and harmless AI assistant. Your task is to expand the user's query related to nutrition disorders to improve information retrieval. | |
| If there are multiple common ways of phrasing a user's query or common synonyms for key words in the question, make sure to return multiple versions | |
| of the query with the different phrasings. | |
| If the query has multiple parts, split them into separate simpler queries. This is the only case where you can generate more than 3 queries. | |
| If there are acronyms or words you are not familiar with, do not try to rephrase them. | |
| Return only 3 versions of the question as a list. | |
| Generate only a list of questions. Do not mention anything before or after the list. | |
| **Guidelines for Query Expansion:** | |
| 1. **Retain Original Intent:** Ensure the expanded query accurately reflects the user's original information need. Avoid introducing new topics or shifting the focus. | |
| 2. **Add Relevant Keywords:** Introduce keywords and phrases commonly associated with nutritional disorders that are relevant to the user's query. This might include symptoms, causes, risk factors, diagnostic terms, treatment approaches, or related conditions. | |
| 3. **Consider Query Feedback:** If query feedback is available, incorporate it to refine the query further. Address any ambiguities or missing information highlighted in the feedback. | |
| 4. **Conciseness and Clarity:** Keep the expanded query concise and easy to understand. Avoid overly complex or lengthy phrases. | |
| 5. **Focus on Nutritional Aspects:** The expanded query should prioritize aspects related to nutrition, diet, and dietary habits. | |
| 6. **Medical Accuracy:** While not a medical professional, strive for medical accuracy by using terminology and concepts consistent with established nutritional science. | |
| 7. **Output format:** Return only 3 versions of the expanded queries as a list and do not mention anything before or after the list. | |
| **Example:** | |
| **User Query:** "What are the effects of vitamin D deficiency?" | |
| **Query Feedback:** "Focus on bone health." | |
| **Expanded Query:** "[ | |
| 'How does vitamin D deficiency affect bone health in children, adults, and the elderly?', | |
| 'What are the long-term effects of chronic vitamin D deficiency on bone density and fracture risk?', | |
| 'How does vitamin D deficiency contribute to osteoporosis, rickets, and other bone-related disorders?' | |
| ]" | |
| **Remember:** Your goal is to create an expanded query that helps retrieve the most relevant and accurate information about nutrition disorders from a knowledge base. | |
| """ | |
| expand_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Expand this query: {query} using the feedback: {query_feedback}") | |
| ]) | |
| chain = expand_prompt | llm | StrOutputParser() | |
| expanded_query = chain.invoke({"query": state['query'], "query_feedback":state["query_feedback"]}) | |
| print("expanded_query", expanded_query) | |
| state["expanded_query"] = expanded_query | |
| return state | |
| # Initialize the Chroma vector store for retrieving documents | |
| vector_store = Chroma( | |
| collection_name="nutritional_hypotheticals", | |
| persist_directory="./nutritional_db", | |
| embedding_function=embedding_model | |
| ) | |
| # Create a retriever from the vector store | |
| retriever = vector_store.as_retriever( | |
| search_type='similarity', | |
| search_kwargs={'k': 3} | |
| ) | |
| def retrieve_context(state): | |
| """ | |
| Retrieves context from the vector store using the expanded or original query. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the query and expanded query. | |
| Returns: | |
| Dict: The updated state with the retrieved context. | |
| """ | |
| print("---------retrieve_context---------") | |
| query = state['expanded_query'] # Complete the code to define the key for the expanded query | |
| #print("Query used for retrieval:", query) # Debugging: Print the query | |
| # Retrieve documents from the vector store | |
| docs = retriever.invoke(query) | |
| print("Retrieved documents:", docs) # Debugging: Print the raw docs object | |
| # Extract both page_content and metadata from each document | |
| context= [ | |
| { | |
| "content": doc.page_content, # The actual content of the document | |
| "metadata": doc.metadata # The metadata (e.g., source, page number, etc.) | |
| } | |
| for doc in docs | |
| ] | |
| state['context'] = context # Complete the code to define the key for storing the context | |
| print("Extracted context with metadata:", context) # Debugging: Print the extracted context | |
| #print(f"Groundedness loop count: {state['groundedness_loop_count']}") | |
| return state | |
| def craft_response(state: Dict) -> Dict: | |
| """ | |
| Generates a response using the retrieved context, focusing on nutrition disorders. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the query and retrieved context. | |
| Returns: | |
| Dict: The updated state with the generated response. | |
| """ | |
| print("---------craft_response---------") | |
| system_message = """ | |
| You are an helpful AI assisstant specializing in nutrition disorders,vitamin and mineral deficiency. | |
| Your job is to answer user query based on the provided Context | |
| Use the following guidelines: | |
| - If you're unsure about something, ask for clarification | |
| - Only answer the question based on the provided Context | |
| - Use the feedback if provided to refine your answer | |
| """ | |
| #- If you do not know the answer based on the provided Context you must respond with "I do not have the answer based on my context." | |
| response_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Query: {query}\nContext: {context}\n\nfeedback: {feedback}") | |
| ]) | |
| chain = response_prompt | llm | |
| response = chain.invoke({ | |
| "query": state['query'], | |
| "context": "\n".join([doc["content"] for doc in state['context']]), | |
| "feedback": state['feedback'] # add feedback to the prompt | |
| }) | |
| state['response'] = response | |
| print("intermediate response: ", response) | |
| return state | |
| def score_groundedness(state: Dict) -> Dict: | |
| """ | |
| Checks whether the response is grounded in the retrieved context. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the response and context. | |
| Returns: | |
| Dict: The updated state with the groundedness score. | |
| """ | |
| print("---------check_groundedness---------") | |
| system_message = ''' | |
| You are an AI assistant tasked with evaluating the groundedness of responses based on the provided context. | |
| Your goal is to determine if the response aligns with and is supported by the given context. | |
| Guidelines for scoring: | |
| Give the response a score of one decimal point between 0.0 and 1.0 based on the following criteria: | |
| - 1.0 **: The response is entirely supported by the context. | |
| - 0.0 **: The response is entirely unsupported by the context, or response no support from the context. | |
| Evaluate the given response against the provided context and return a **groundedness_score** based on the above criteria. | |
| Stricly just return the groundedness_score and do not explain your response. | |
| ''' | |
| groundedness_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Context: {context}\nResponse: {response}\n\nGroundedness score:") | |
| ]) | |
| chain = groundedness_prompt | llm | StrOutputParser() | |
| groundedness_score = float(chain.invoke({ | |
| "context": "\n".join([doc["content"] for doc in state['context']]), | |
| "response": state['response'] # Complete the code to define the response | |
| })) | |
| print("groundedness_score: ", groundedness_score) | |
| state['groundedness_loop_count'] += 1 | |
| print("#########Groundedness Incremented###########") | |
| state['groundedness_score'] = groundedness_score | |
| return state | |
| def check_precision(state: Dict) -> Dict: | |
| """ | |
| Checks whether the response precisely addresses the user’s query. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the query and response. | |
| Returns: | |
| Dict: The updated state with the precision score. | |
| """ | |
| print("---------check_precision---------") | |
| system_message =""" | |
| You are an AI assistant tasked with evaluating the precision of response based on the user’s query. | |
| Your goal is to determine if the precisely addresses the user’s query. | |
| Given user's query and response, verify if the response precisely addresses the user query. | |
| Guidelines for scoring: | |
| Give the response a score of one decimal point between 0.0 and 1.0 based on the following criteria: | |
| - 1.0 **: The response is precisely addressing the user's query. | |
| - 0.0 **: The response, by no means, address the user's query. | |
| Evaluate the given response against the user's query and return a **precision_score** based on the above criteria. | |
| Stricly just return the precision_score and do not explain your response. | |
| """ | |
| precision_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Query: {query}\nResponse: {response}\n\nPrecision score:") | |
| ]) | |
| chain = precision_prompt | llm | StrOutputParser() # Complete the code to define the chain of processing | |
| precision_score = float(chain.invoke({ | |
| "query": state['query'], | |
| "response":state['response'] # Complete the code to access the response from the state | |
| })) | |
| state['precision_score'] = precision_score | |
| print("precision_score:", precision_score) | |
| state['precision_loop_count'] +=1 | |
| print("#########Precision Incremented###########") | |
| return state | |
| def refine_response(state: Dict) -> Dict: | |
| """ | |
| Suggests improvements for the generated response. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the query and response. | |
| Returns: | |
| Dict: The updated state with response refinement suggestions. | |
| """ | |
| print("---------refine_response---------") | |
| system_message = ''' | |
| Given following query and response what improvements can be made to enhance accuracy and completeness of the generated response? | |
| ''' | |
| refine_response_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Query: {query}\nResponse: {response}\n\n" | |
| "What improvements can be made to enhance accuracy and completeness?") | |
| ]) | |
| chain = refine_response_prompt | llm| StrOutputParser() | |
| # Store response suggestions in a structured format | |
| feedback = f"Previous Response: {state['response']}\nSuggestions: {chain.invoke({'query': state['query'], 'response': state['response']})}" | |
| print("feedback: ", feedback) | |
| print(f"State: {state}") | |
| state['feedback'] = feedback | |
| return state | |
| def refine_query(state: Dict) -> Dict: | |
| """ | |
| Suggests improvements for the expanded query. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the query and expanded query. | |
| Returns: | |
| Dict: The updated state with query refinement suggestions. | |
| """ | |
| print("---------refine_query---------") | |
| system_message = ''' | |
| Given following query and expanded_query that is generated for improve search result, | |
| your task is to provide improvements that can be made to expanded_query and enhance search precision. | |
| ''' | |
| refine_query_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Original Query: {query}\nExpanded Query: {expanded_query}\n\n" | |
| "What improvements can be made for a better search?") | |
| ]) | |
| chain = refine_query_prompt | llm | StrOutputParser() | |
| # Store refinement suggestions without modifying the original expanded query | |
| query_feedback = f"Previous Expanded Query: {state['expanded_query']}\nSuggestions: {chain.invoke({'query': state['query'], 'expanded_query': state['expanded_query']})}" | |
| print("query_feedback: ", query_feedback) | |
| print(f"Groundedness loop count: {state['groundedness_loop_count']}") | |
| state['query_feedback'] = query_feedback | |
| return state | |
| def should_continue_groundedness(state): | |
| """Decides if groundedness is sufficient or needs improvement.""" | |
| print("---------should_continue_groundedness---------") | |
| print("groundedness loop count: ", state['groundedness_loop_count']) | |
| if state['groundedness_score'] >= 0.9: # Complete the code to define the threshold for groundedness | |
| print("Moving to precision") | |
| return "check_precision" | |
| else: | |
| if state["groundedness_loop_count"] > state['loop_max_iter']: | |
| return "max_iterations_reached" | |
| else: | |
| print(f"---------Groundedness Score Threshold Not met. Refining Response-----------") | |
| return "refine_response" | |
| def should_continue_precision(state: Dict) -> str: | |
| """Decides if precision is sufficient or needs improvement.""" | |
| print("---------should_continue_precision---------") | |
| print("precision loop count: ", state['precision_loop_count']) | |
| if state['precision_score'] >= 0.9: # Threshold for precision | |
| return "pass" # Complete the workflow | |
| else: | |
| if state['precision_loop_count'] > state['loop_max_iter']: # Maximum allowed loops | |
| return "max_iterations_reached" | |
| else: | |
| print(f"---------Precision Score Threshold Not met. Refining Query-----------") # Debugging | |
| return "refine_query" # Refine the query | |
| def max_iterations_reached(state: Dict) -> Dict: | |
| """Handles the case when the maximum number of iterations is reached.""" | |
| print("---------max_iterations_reached---------") | |
| """Handles the case when the maximum number of iterations is reached.""" | |
| response = "I'm unable to refine the response further. Please provide more context or clarify your question." | |
| state['response'] = response | |
| return state | |
| from langgraph.graph import END, StateGraph, START | |
| def create_workflow() -> StateGraph: | |
| """Creates the updated workflow for the AI nutrition agent.""" | |
| workflow = StateGraph(AgentState) # Complete the code to define the initial state of the agent | |
| # Add processing nodes | |
| workflow.add_node("expand_query", expand_query) # Step 1: Expand user query. Complete with the function to expand the query | |
| workflow.add_node("retrieve_context", retrieve_context) # Step 2: Retrieve relevant documents. Complete with the function to retrieve context | |
| workflow.add_node("craft_response", craft_response) # Step 3: Generate a response based on retrieved data. Complete with the function to craft a response | |
| workflow.add_node("score_groundedness", score_groundedness) # Step 4: Evaluate response grounding. Complete with the function to score groundedness | |
| workflow.add_node("refine_response", refine_response) # Step 5: Improve response if it's weakly grounded. Complete with the function to refine the response | |
| workflow.add_node("check_precision", check_precision) # Step 6: Evaluate response precision. Complete with the function to check precision | |
| workflow.add_node("refine_query", refine_query) # Step 7: Improve query if response lacks precision. Complete with the function to refine the query | |
| workflow.add_node("max_iterations_reached", max_iterations_reached) # Step 8: Handle max iterations. Complete with the function to handle max iterations | |
| # Main flow edges | |
| workflow.add_edge(START, "expand_query") | |
| workflow.add_edge("expand_query", "retrieve_context") | |
| workflow.add_edge("retrieve_context", "craft_response") | |
| workflow.add_edge("craft_response", "score_groundedness") | |
| # Conditional edges based on groundedness check | |
| workflow.add_conditional_edges( | |
| "score_groundedness", | |
| should_continue_groundedness, # Use the conditional function | |
| { | |
| "check_precision": "check_precision", # If well-grounded, proceed to precision check. Use the node name "check_precision" | |
| "refine_response": "refine_response", # If not, refine the response. | |
| "max_iterations_reached": "max_iterations_reached" # If max loops reached, exit. | |
| } | |
| ) | |
| workflow.add_edge("refine_response", "craft_response") # Refined responses are reprocessed. | |
| # Conditional edges based on precision check | |
| workflow.add_conditional_edges( | |
| "check_precision", | |
| should_continue_precision, # Use the conditional function | |
| { | |
| "pass": END, # If precise, complete the workflow. | |
| "refine_query": "refine_query", # If imprecise, refine the query. | |
| "max_iterations_reached": "max_iterations_reached" # If max loops reached, exit. | |
| } | |
| ) | |
| workflow.add_edge("refine_query", "expand_query") # Refined queries go through expansion again. | |
| workflow.add_edge("max_iterations_reached", END) | |
| return workflow | |
| #=========================== Defining the agentic rag tool ====================# | |
| WORKFLOW_APP = create_workflow().compile() | |
| def agentic_rag(query: str): | |
| """ | |
| Runs the RAG-based agent with conversation history for context-aware responses. | |
| Args: | |
| query (str): The current user query. | |
| Returns: | |
| Dict[str, Any]: The updated state with the generated response and conversation history. | |
| """ | |
| # Initialize state with necessary parameters | |
| inputs = { | |
| "query": query, # Current user query | |
| "expanded_query": "", # Complete the code to define the expanded version of the query | |
| "context": [], # Retrieved documents (initially empty) | |
| "response": "", # Complete the code to define the AI-generated response | |
| "precision_score": 0.0, # Complete the code to define the precision score of the response | |
| "groundedness_score": 0.0, # Complete the code to define the groundedness score of the response | |
| "groundedness_loop_count": 3, # Complete the code to define the counter for groundedness loops | |
| "precision_loop_count": 3, # Complete the code to define the counter for precision loops | |
| "feedback": "", # Complete the code to define the feedback | |
| "query_feedback": "", # Complete the code to define the query feedback | |
| "loop_max_iter": 3 # Complete the code to define the maximum number of iterations for loops | |
| } | |
| output = WORKFLOW_APP.invoke(inputs) | |
| return output | |
| # Function to filter user input with Llama Guard | |
| def filter_input_with_llama_guard(user_input, model="meta-llama/llama-guard-4-12b"): | |
| """ | |
| Filters user input using Llama Guard to ensure it is safe. | |
| Parameters: | |
| - user_input: The input provided by the user. | |
| - model: The Llama Guard model to be used for filtering (default is "meta-llama/llama-guard-4-12b"). | |
| Returns: | |
| - The filtered and safe input. | |
| """ | |
| try: | |
| # Create a request to Llama Guard to filter the user input | |
| response = llama_guard_client.chat.completions.create( | |
| messages=[{"role": "user", "content": user_input}], | |
| model=model, | |
| ) | |
| # Return the filtered input | |
| return response.choices[0].message.content.strip() | |
| except Exception as e: | |
| print(f"Error with Llama Guard: {e}") | |
| return None | |
| #============================= Adding Memory to the agent using mem0 ===============================# | |
| class NutritionBot: | |
| def __init__(self): | |
| """ | |
| Initialize the NutritionBot class, setting up memory, the LLM client, tools, and the agent executor. | |
| """ | |
| # Initialize a memory client to store and retrieve customer interactions | |
| self.memory = MemoryClient(api_key=MEM0_api_key) # Complete the code to define the memory client API key | |
| # Initialize the Azure OpenAI client using the provided credentials | |
| self.client = AzureChatOpenAI( | |
| model_name= model_name, # Specify the model to use (e.g., GPT-4 optimized version) | |
| api_key= api_key, # API key for authentication | |
| azure_endpoint= endpoint, # Endpoint URL for Azure OpenAI | |
| api_version= api_version, # API version being used | |
| temperature= 0 # Controls randomness in responses; 0 ensures deterministic results | |
| ) | |
| # Define tools available to the chatbot, such as web search | |
| tools = [agentic_rag] | |
| # Define the system prompt to set the behavior of the chatbot | |
| system_prompt = """You are a caring and knowledgeable Medical Support Agent, specializing in nutrition disorder-related guidance. Your goal is to provide accurate, empathetic, and tailored nutritional recommendations while ensuring a seamless customer experience. | |
| Guidelines for Interaction: | |
| Maintain a polite, professional, and reassuring tone. | |
| Show genuine empathy for customer concerns and health challenges. | |
| Reference past interactions to provide personalized and consistent advice. | |
| Engage with the customer by asking about their food preferences, dietary restrictions, and lifestyle before offering recommendations. | |
| Ensure consistent and accurate information across conversations. | |
| If any detail is unclear or missing, proactively ask for clarification. | |
| Always use the agentic_rag tool to retrieve up-to-date and evidence-based nutrition insights. | |
| Keep track of ongoing issues and follow-ups to ensure continuity in support. | |
| Your primary goal is to help customers make informed nutrition decisions that align with their health conditions and personal preferences. | |
| """ | |
| # Build the prompt template for the agent | |
| prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_prompt), # System instructions | |
| ("human", "{input}"), # Placeholder for human input | |
| ("placeholder", "{agent_scratchpad}") # Placeholder for intermediate reasoning steps | |
| ]) | |
| # Create an agent capable of interacting with tools and executing tasks | |
| agent = create_tool_calling_agent(self.client, tools, prompt) | |
| # Wrap the agent in an executor to manage tool interactions and execution flow | |
| self.agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) | |
| def store_customer_interaction(self, user_id: str, message: str, response: str, metadata: Dict = None): | |
| """ | |
| Store customer interaction in memory for future reference. | |
| Args: | |
| user_id (str): Unique identifier for the customer. | |
| message (str): Customer's query or message. | |
| response (str): Chatbot's response. | |
| metadata (Dict, optional): Additional metadata for the interaction. | |
| """ | |
| if metadata is None: | |
| metadata = {} | |
| # Add a timestamp to the metadata for tracking purposes | |
| metadata["timestamp"] = datetime.now().isoformat() | |
| # Format the conversation for storage | |
| conversation = [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": response} | |
| ] | |
| # Store the interaction in the memory client | |
| self.memory.add( | |
| conversation, | |
| user_id=user_id, | |
| output_format="v1.1", | |
| metadata=metadata | |
| ) | |
| def get_relevant_history(self, user_id: str, query: str) -> List[Dict]: | |
| """ | |
| Retrieve past interactions relevant to the current query. | |
| Args: | |
| user_id (str): Unique identifier for the customer. | |
| query (str): The customer's current query. | |
| Returns: | |
| List[Dict]: A list of relevant past interactions. | |
| """ | |
| return self.memory.search( | |
| query=query, # Search for interactions related to the query | |
| user_id=user_id, # Restrict search to the specific user | |
| limit=5 # Complete the code to define the limit for retrieved interactions | |
| ) | |
| def handle_customer_query(self, user_id: str, query: str) -> str: | |
| """ | |
| Process a customer's query and provide a response, taking into account past interactions. | |
| Args: | |
| user_id (str): Unique identifier for the customer. | |
| query (str): Customer's query. | |
| Returns: | |
| str: Chatbot's response. | |
| """ | |
| # Retrieve relevant past interactions for context | |
| relevant_history = self.get_relevant_history(user_id, query) | |
| # Build a context string from the relevant history | |
| context = "Previous relevant interactions:\n" | |
| for memory in relevant_history: | |
| context += f"Customer: {memory['memory']}\n" # Customer's past messages | |
| context += f"Support: {memory['memory']}\n" # Chatbot's past responses | |
| context += "---\n" | |
| # Print context for debugging purposes | |
| print("Context: ", context) | |
| # Prepare a prompt combining past context and the current query | |
| prompt = f""" | |
| Context: | |
| {context} | |
| Current customer query: {query} | |
| Provide a helpful response that takes into account any relevant past interactions. | |
| """ | |
| # Generate a response using the agent | |
| response = self.agent_executor.invoke({"input": prompt}) | |
| # Store the current interaction for future reference | |
| self.store_customer_interaction( | |
| user_id=user_id, | |
| message=query, | |
| response=response["output"], | |
| metadata={"type": "support_query"} | |
| ) | |
| # Return the chatbot's response | |
| return response['output'] | |
| #=====================User Interface using streamlit ===========================# | |
| def nutrition_disorder_streamlit(): | |
| """ | |
| A Streamlit-based UI for the Nutrition Disorder Specialist Agent. | |
| """ | |
| st.title("Nutrition Disorder Specialist") | |
| st.write("Ask me anything about nutrition disorders, symptoms, causes, treatments, and more.") | |
| st.write("Type 'exit' to end the conversation.") | |
| # Initialize session state for chat history and user_id if they don't exist | |
| if 'chat_history' not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if 'user_id' not in st.session_state: | |
| st.session_state.user_id = None | |
| # Login form: Only if user is not logged in | |
| if st.session_state.user_id is None: | |
| with st.form("login_form", clear_on_submit=True): | |
| user_id = st.text_input("Please enter your name to begin:") | |
| submit_button = st.form_submit_button("Login") | |
| if submit_button and user_id: | |
| st.session_state.user_id = user_id | |
| st.session_state.chat_history.append({ | |
| "role": "assistant", | |
| "content": f"Welcome, {user_id}! How can I help you with nutrition disorders today?" | |
| }) | |
| st.session_state.login_submitted = True # Set flag to trigger rerun | |
| if st.session_state.get("login_submitted", False): | |
| st.session_state.pop("login_submitted") | |
| st.rerun() | |
| else: | |
| # Display chat history | |
| for message in st.session_state.chat_history: | |
| with st.chat_message(message["role"]): | |
| st.write(message["content"]) | |
| # Chat input with custom placeholder text | |
| user_query = st.chat_input("Type your question here (or 'exit' to end)...") # Blank #1: Fill in the chat input prompt (e.g., "Type your question here (or 'exit' to end)...") | |
| if user_query: | |
| if user_query.lower() == "exit": | |
| st.session_state.chat_history.append({"role": "user", "content": "exit"}) | |
| with st.chat_message("user"): | |
| st.write("exit") | |
| goodbye_msg = "Goodbye! Feel free to return if you have more questions about nutrition disorders." | |
| st.session_state.chat_history.append({"role": "assistant", "content": goodbye_msg}) | |
| with st.chat_message("assistant"): | |
| st.write(goodbye_msg) | |
| st.session_state.user_id = None | |
| st.rerun() | |
| return | |
| st.session_state.chat_history.append({"role": "user", "content": user_query}) | |
| with st.chat_message("user"): | |
| st.write(user_query) | |
| # Filter input through Llama Guard - returns "SAFE" or "UNSAFE" | |
| filtered_result = filter_input_with_llama_guard(user_query) # Call function to filter input | |
| filtered_result = filtered_result.replace("\n", " ") # Normalize the result | |
| st.write(filtered_result) | |
| # Check if input is safe based on allowed statuses | |
| # We are by passing some cases like "S6" and "S7" so that it can work effectively. | |
| if filtered_result in ["safe", "unsafe S7", "unsafe S6"]: | |
| try: | |
| if 'chatbot' not in st.session_state: | |
| st.session_state.chatbot = NutritionBot() # Blank #6: Fill in with the chatbot class initialization (e.g., NutritionBot) | |
| response = st.session_state.chatbot.handle_customer_query(st.session_state.user_id, user_query) | |
| # Blank #7: Fill in with the method to handle queries (e.g., handle_customer_query) | |
| st.write(response) | |
| st.session_state.chat_history.append({"role": "assistant", "content": response}) | |
| except Exception as e: | |
| error_msg = f"Sorry, I encountered an error while processing your query. Please try again. Error: {str(e)}" | |
| st.write(error_msg) | |
| st.session_state.chat_history.append({"role": "assistant", "content": error_msg}) | |
| else: | |
| inappropriate_msg = "I apologize, but I cannot process that input as it may be inappropriate. Please try again." | |
| st.write(inappropriate_msg) | |
| st.session_state.chat_history.append({"role": "assistant", "content": inappropriate_msg}) | |
| if __name__ == "__main__": | |
| nutrition_disorder_streamlit() | |