import base64 import copy import json import os import random import time from io import BytesIO from mimetypes import guess_type import cv2 import requests from azure.identity import AzureCliCredential def retry_with_exponential_backoff( func, initial_delay: float = 1, exponential_base: float = 2, jitter: bool = True, max_retries: int = 8, ): """Retry a function with exponential backoff.""" def wrapper(*args, **kwargs): # Initialize variables num_retries = 0 delay = initial_delay # Loop until a successful response or max_retries is hit or an exception is raised while True: try: return func(*args, **kwargs) # Raise exceptions for any errors not specified except Exception as e: if "rate limit" in str(e).lower() or "timed out" in str(e) \ or "Too Many Requests" in str(e) or "Forbidden for url" in str(e) \ or "internal" in str(e).lower(): # Increment retries num_retries += 1 # Check if max retries has been reached if num_retries > max_retries: print("Max retries reached. Exiting.") return None # Increment the delay delay *= exponential_base * (1 + jitter * random.random()) print(f"Retrying in {delay} seconds for {str(e)}...") # Sleep for the delay time.sleep(delay) else: print(str(e)) return None return wrapper # Function to encode a local image into data URL def local_image_to_data_url(image_path): # Guess the MIME type of the image based on the file extension mime_type, _ = guess_type(image_path) if mime_type is None: mime_type = "application/octet-stream" # Default MIME type if none is found image = cv2.imread(image_path) if image is None: raise ValueError(f"Could not read image from path: {image_path}") # Read and encode the image file with open(image_path, "rb") as image_file: base64_encoded_data = base64.b64encode(image_file.read()).decode("utf-8") # Construct the data URL return f"data:{mime_type};base64,{base64_encoded_data}" @retry_with_exponential_backoff def call_openai_model_with_tools( messages, endpoints, model_name, api_key: str = None, tools: list = [], # List of tool definitions image_paths: list = [], max_tokens: int = 4096, temperature: float = 0.0, tool_choice: str = "auto", # Can be "auto", "none", or a specific tool return_json: bool = False, ) -> dict: if api_key: headers = { "Content-Type": "application/json", 'Authorization': 'Bearer ' + api_key } endpoint = "https://api.openai.com/v1" url = f"{endpoint}/chat/completions" else: credential = AzureCliCredential() token = credential.get_token('https://cognitiveservices.azure.com/') headers = { "Content-Type": "application/json", 'Authorization': 'Bearer ' + token.token } if isinstance(endpoints, str): endpoint = endpoints elif isinstance(endpoints, list): endpoint = random.choice(endpoints) else: raise ValueError("Endpoints must be a string or a list of strings.") url = f"{endpoint}/openai/deployments/{model_name}/chat/completions?api-version=2025-03-01-preview" model = model_name payload = { "model": model, "messages": copy.deepcopy(messages), # "reasoning_effort": reasoning_effort, } if return_json: payload["response_format"] = {"type": "json_object"} # Add tools to the payload if provided if tools: payload["tools"] = tools payload["tool_choice"] = tool_choice if image_paths: # with mp.Pool(processes=min(len(image_paths), mp.cpu_count())) as pool: # image_data_list = pool.map(local_image_to_data_url, image_paths) image_data_list = [local_image_to_data_url(image_path) for image_path in image_paths] payload['messages'].append({"role": "user", "content": []}) for image_data in image_data_list: payload['messages'][-1]['content'].append({"type": "image_url", "image_url": {"url": image_data}}) response = requests.post(url, headers=headers, json=payload, timeout=600) if response.status_code != 200: error_text = response.text raise Exception(f"OpenAI API returned status {response.status_code}: {error_text}") response_data = response.json() # Get the message from the response message = response_data['choices'][0]['message'] # Check if there's a tool call in the response if "tool_calls" in message: # Return the entire message object when tools are being used return message else: # If there's no tool call, just return the text content return {"content": message['content'].strip(), "tool_calls": None} class AzureOpenAIEmbeddingService: @staticmethod @retry_with_exponential_backoff def get_embeddings(endpoints, model_name, input_text, api_key: str = None): """ Call Azure OpenAI Embedding service and get embeddings for the input text. :param api_key: Your Azure OpenAI API key. :param endpoint: The endpoint URL for the OpenAI service. :param model: The model name for generating embeddings. :param input_text: The text for which you want to generate embeddings. :return: The embeddings as a JSON response. """ if api_key: headers = { "Content-Type": "application/json", 'Authorization': 'Bearer ' + api_key } endpoint = "https://api.openai.com/v1" url = f"{endpoint}/embeddings" else: if isinstance(endpoints, str): endpoint = endpoints elif isinstance(endpoints, list): endpoint = random.choice(endpoints) else: raise ValueError("Endpoints must be a string or a list of strings.") # Define the URL for the embeddings endpoint url = f"{endpoint}/openai/deployments/{model_name}/embeddings?api-version=2023-05-15" credential = AzureCliCredential() token = credential.get_token('https://cognitiveservices.azure.com/') headers = { "Content-Type": "application/json", 'Authorization': 'Bearer ' + token.token } model = model_name # Set up the payload for the request payload = { "input": input_text, "model": model } # Make the request to the Azure OpenAI service response = requests.post(url, headers=headers, data=json.dumps(payload)) # Check if the request was successful if response.status_code == 200: return response.json()['data'] else: response.raise_for_status() def extract_answer(message: dict) -> str | None: """ Extract the plain-text answer from an assistant message that may include tool calls. The function first checks the normal `content` field (for responses that are not using tools). If the assistant responded via a tool call, it attempts to parse the JSON string stored in `message["tool_calls"][i]["function"]["arguments"]` and returns the value associated with the key `"answer"`. Parameters ---------- message : dict The assistant message returned by `call_openai_model_with_tools`. Returns ------- str | None The extracted answer, or ``None`` if no answer could be found. """ # Tool-based response for call in message.get("tool_calls", []): args_json = call["function"]["arguments"] args = json.loads(args_json) if (answer := args.get("answer")): return answer # Direct text response if (content := message.get("content")): return content.strip() return None if __name__ == "__main__": # Example for Azure # call_openai_model_with_tools( # messages=[{"role": "user", "content": "Hello, how are you?"}], # endpoints=["https://msra-im-openai-eus2.openai.azure.com"], # model_name="o3", # tools=[], # image_paths=[], # max_tokens=4096, # temperature=0.0, # tool_choice="auto", # return_json=False, # ) # Example for OpenAI api_key = os.environ.get("OPENAI_API_KEY") if api_key: response = call_openai_model_with_tools( messages=[{"role": "user", "content": "Hello, how are you?"}], endpoints=None, # Not used for OpenAI model_name="gpt-4o", api_key=api_key, tools=[], image_paths=[], max_tokens=4096, temperature=0.0, tool_choice="auto", return_json=False, ) print(response) else: print("OPENAI_API_KEY environment variable not set.")