aghiadbakry's picture
Upload 7 files
76592fa verified
import base64
import copy
import json
import os
import random
import time
from io import BytesIO
from mimetypes import guess_type
import cv2
import requests
from azure.identity import AzureCliCredential
def retry_with_exponential_backoff(
func,
initial_delay: float = 1,
exponential_base: float = 2,
jitter: bool = True,
max_retries: int = 8,
):
"""Retry a function with exponential backoff."""
def wrapper(*args, **kwargs):
# Initialize variables
num_retries = 0
delay = initial_delay
# Loop until a successful response or max_retries is hit or an exception is raised
while True:
try:
return func(*args, **kwargs)
# Raise exceptions for any errors not specified
except Exception as e:
if "rate limit" in str(e).lower() or "timed out" in str(e) \
or "Too Many Requests" in str(e) or "Forbidden for url" in str(e) \
or "internal" in str(e).lower():
# Increment retries
num_retries += 1
# Check if max retries has been reached
if num_retries > max_retries:
print("Max retries reached. Exiting.")
return None
# Increment the delay
delay *= exponential_base * (1 + jitter * random.random())
print(f"Retrying in {delay} seconds for {str(e)}...")
# Sleep for the delay
time.sleep(delay)
else:
print(str(e))
return None
return wrapper
# Function to encode a local image into data URL
def local_image_to_data_url(image_path):
# Guess the MIME type of the image based on the file extension
mime_type, _ = guess_type(image_path)
if mime_type is None:
mime_type = "application/octet-stream" # Default MIME type if none is found
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"Could not read image from path: {image_path}")
# Read and encode the image file
with open(image_path, "rb") as image_file:
base64_encoded_data = base64.b64encode(image_file.read()).decode("utf-8")
# Construct the data URL
return f"data:{mime_type};base64,{base64_encoded_data}"
@retry_with_exponential_backoff
def call_openai_model_with_tools(
messages,
endpoints,
model_name,
api_key: str = None,
tools: list = [], # List of tool definitions
image_paths: list = [],
max_tokens: int = 4096,
temperature: float = 0.0,
tool_choice: str = "auto", # Can be "auto", "none", or a specific tool
return_json: bool = False,
) -> dict:
if api_key:
headers = {
"Content-Type": "application/json",
'Authorization': 'Bearer ' + api_key
}
endpoint = "https://api.openai.com/v1"
url = f"{endpoint}/chat/completions"
else:
credential = AzureCliCredential()
token = credential.get_token('https://cognitiveservices.azure.com/')
headers = {
"Content-Type": "application/json",
'Authorization': 'Bearer ' + token.token
}
if isinstance(endpoints, str):
endpoint = endpoints
elif isinstance(endpoints, list):
endpoint = random.choice(endpoints)
else:
raise ValueError("Endpoints must be a string or a list of strings.")
url = f"{endpoint}/openai/deployments/{model_name}/chat/completions?api-version=2025-03-01-preview"
model = model_name
payload = {
"model": model,
"messages": copy.deepcopy(messages),
# "reasoning_effort": reasoning_effort,
}
if return_json:
payload["response_format"] = {"type": "json_object"}
# Add tools to the payload if provided
if tools:
payload["tools"] = tools
payload["tool_choice"] = tool_choice
if image_paths:
# with mp.Pool(processes=min(len(image_paths), mp.cpu_count())) as pool:
# image_data_list = pool.map(local_image_to_data_url, image_paths)
image_data_list = [local_image_to_data_url(image_path) for image_path in image_paths]
payload['messages'].append({"role": "user", "content": []})
for image_data in image_data_list:
payload['messages'][-1]['content'].append({"type": "image_url", "image_url": {"url": image_data}})
response = requests.post(url, headers=headers, json=payload, timeout=600)
if response.status_code != 200:
error_text = response.text
raise Exception(f"OpenAI API returned status {response.status_code}: {error_text}")
response_data = response.json()
# Get the message from the response
message = response_data['choices'][0]['message']
# Check if there's a tool call in the response
if "tool_calls" in message:
# Return the entire message object when tools are being used
return message
else:
# If there's no tool call, just return the text content
return {"content": message['content'].strip(), "tool_calls": None}
class AzureOpenAIEmbeddingService:
@staticmethod
@retry_with_exponential_backoff
def get_embeddings(endpoints, model_name, input_text, api_key: str = None):
"""
Call Azure OpenAI Embedding service and get embeddings for the input text.
:param api_key: Your Azure OpenAI API key.
:param endpoint: The endpoint URL for the OpenAI service.
:param model: The model name for generating embeddings.
:param input_text: The text for which you want to generate embeddings.
:return: The embeddings as a JSON response.
"""
if api_key:
headers = {
"Content-Type": "application/json",
'Authorization': 'Bearer ' + api_key
}
endpoint = "https://api.openai.com/v1"
url = f"{endpoint}/embeddings"
else:
if isinstance(endpoints, str):
endpoint = endpoints
elif isinstance(endpoints, list):
endpoint = random.choice(endpoints)
else:
raise ValueError("Endpoints must be a string or a list of strings.")
# Define the URL for the embeddings endpoint
url = f"{endpoint}/openai/deployments/{model_name}/embeddings?api-version=2023-05-15"
credential = AzureCliCredential()
token = credential.get_token('https://cognitiveservices.azure.com/')
headers = {
"Content-Type": "application/json",
'Authorization': 'Bearer ' + token.token
}
model = model_name
# Set up the payload for the request
payload = {
"input": input_text,
"model": model
}
# Make the request to the Azure OpenAI service
response = requests.post(url, headers=headers, data=json.dumps(payload))
# Check if the request was successful
if response.status_code == 200:
return response.json()['data']
else:
response.raise_for_status()
def extract_answer(message: dict) -> str | None:
"""
Extract the plain-text answer from an assistant message that may include
tool calls.
The function first checks the normal `content` field (for responses that
are not using tools). If the assistant responded via a tool call, it
attempts to parse the JSON string stored in
`message["tool_calls"][i]["function"]["arguments"]` and returns the value
associated with the key `"answer"`.
Parameters
----------
message : dict
The assistant message returned by `call_openai_model_with_tools`.
Returns
-------
str | None
The extracted answer, or ``None`` if no answer could be found.
"""
# Tool-based response
for call in message.get("tool_calls", []):
args_json = call["function"]["arguments"]
args = json.loads(args_json)
if (answer := args.get("answer")):
return answer
# Direct text response
if (content := message.get("content")):
return content.strip()
return None
if __name__ == "__main__":
# Example for Azure
# call_openai_model_with_tools(
# messages=[{"role": "user", "content": "Hello, how are you?"}],
# endpoints=["https://msra-im-openai-eus2.openai.azure.com"],
# model_name="o3",
# tools=[],
# image_paths=[],
# max_tokens=4096,
# temperature=0.0,
# tool_choice="auto",
# return_json=False,
# )
# Example for OpenAI
api_key = os.environ.get("OPENAI_API_KEY")
if api_key:
response = call_openai_model_with_tools(
messages=[{"role": "user", "content": "Hello, how are you?"}],
endpoints=None, # Not used for OpenAI
model_name="gpt-4o",
api_key=api_key,
tools=[],
image_paths=[],
max_tokens=4096,
temperature=0.0,
tool_choice="auto",
return_json=False,
)
print(response)
else:
print("OPENAI_API_KEY environment variable not set.")