import os import asyncio import json import logging import requests from typing import Dict from pydantic import BaseModel from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.messages import TextMessage from autogen_core import CancellationToken from autogen_ext.models.openai import OpenAIChatCompletionClient import re # Enable debug logging #logging.basicConfig(level=logging.DEBUG) # Load API keys from environment variables OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") if not OPENAI_API_KEY: raise ValueError("Missing OPENAI_API_KEY. Set it as an environment variable.") PAGODA_API_KEY = os.getenv("PAGODA_API_KEY") if not PAGODA_API_KEY: raise ValueError("Missing PAGODA_API_KEY. Set it as an environment variable.") # Define the expected response format as a Pydantic model class AgentResponse(BaseModel): my_share: int other_share: int motivation: str # The dictator game simulation class class Dictator: def __init__(self, amount: int, model: str, temperature: float, strategy=False, max_retries: int = 3): self.debug = False self.amount = amount self.model = model self.temperature = temperature self.strategy = strategy self.max_retries = max_retries is_openai_model = model.startswith("gpt") is_pagoda_model = ":" in model base_url = ( "https://api.openai.com/v1" if is_openai_model else "https://ollama-ui.pagoda.liris.cnrs.fr/ollama/api/generate" if is_pagoda_model else "http://localhost:11434/v1" ) self.base_url = base_url if self.debug: print(f"Using model: {model}") print(f"Using base URL: {base_url}") key = OPENAI_API_KEY if is_openai_model else PAGODA_API_KEY model_info = { "temperature": self.temperature, "function_calling": True, "parallel_tool_calls": False, "family": "unknown", "json_output": True, "vision": False } self.model_client = OpenAIChatCompletionClient( model=self.model, base_url=base_url, api_key=key, model_info=model_info, response_format=AgentResponse ) async def run(self) -> Dict: """Runs the model if strategy is False, otherwise uses a classical method.""" if self.strategy: return self.apply_strategy() instruction = f""" The decision maker is matched with an anonymous subject and there is no feedback. The decision maker has 100 points that need to be allocated between him/herself and the other one. The decision maker will get return from the points allocated to him/herself and the other one will get return from the points allocated to him/her. First please only tell me the number of points you want to allocate to yourself, then please only tell me the number of points you want to allocate to the other. Return the response in JSON format with `my_share`, `other_share`, and `motivation`. """ is_pagoda_model = ":" in self.model if is_pagoda_model: return await self.run_pagoda(instruction) for attempt in range(self.max_retries): agent = AssistantAgent( name="Dictator", model_client=self.model_client, system_message="You are a helpful assistant." ) response = await agent.on_messages( [TextMessage(content=instruction, source="user")], cancellation_token=CancellationToken(), ) try: # Correct: get the content from the chat message raw_text = response.chat_message.content # Debug: show the raw content print(f"Raw content (Attempt {attempt + 1}): {raw_text}") # Try to load JSON directly try: response_json = json.loads(raw_text) except json.JSONDecodeError: # If it's wrapped in ```json ... ```, extract it match = re.search(r'```json\s*(.*?)\s*```', raw_text, re.DOTALL) if match: response_json = json.loads(match.group(1)) else: print(f"Could not parse JSON from response (Attempt {attempt + 1})") continue agent_response = AgentResponse(**response_json) my_share, other_share = agent_response.my_share, agent_response.other_share # Validate shares if 0 <= my_share <= self.amount and 0 <= other_share <= self.amount and my_share + other_share <= self.amount: return agent_response.dict() else: print(f"Invalid values in response (Attempt {attempt + 1}): {response_json}") except Exception as e: print(f"Error in OpenAI response handling (Attempt {attempt + 1}): {e}") raise ValueError("Model failed to provide a valid response after multiple attempts.") async def run_pagoda(self, instruction) -> Dict: """Runs the Pagoda model using a direct request.""" url = self.base_url headers = { "Authorization": f"Bearer {PAGODA_API_KEY}", "Content-Type": "application/json" } payload = { "model": self.model, "temperature": self.temperature, "prompt": instruction, "stream": False, "response_format": { "type": "json_schema", "json_schema": { "name": "AgentResponse", "strict": True, "schema": { "title": "AgentResponse", "type": "object", "properties": { "my_share": { "title": "My Share", "type": "integer" }, "other_share": { "title": "Other Share", "type": "integer" }, "motivation": { "title": "Motivation", "type": "string" } }, "required": ["my_share", "other_share", "motivation"], "additionalProperties": False } } } } for attempt in range(self.max_retries): try: response = requests.post(url, headers=headers, json=payload) response.raise_for_status() # Get the JSON response response_data = response.json() # Debug: print the raw response to check if fields are missing or named differently if self.debug: print(f"Raw response (Attempt {attempt+1}): {response_data}") # The response field should be parsed correctly if it's already valid JSON response_json = response_data.get('response', '') # If the response is a string containing JSON, we need to extract and parse it if isinstance(response_json, str): # Try to parse the response as JSON try: response_dict = json.loads(response_json) except json.JSONDecodeError: # If the response is not valid JSON, apply regex to extract the JSON portion match = re.search(r"```json(.*?)```", response_json, re.DOTALL) if match: response_dict = json.loads(match.group(1)) else: print(f"Invalid response format detected (Attempt {attempt + 1}): {response_json}") continue elif isinstance(response_json, dict): # If response_json is already a dictionary, just use it response_dict = response_json else: print(f"Unexpected format in 'response' field (Attempt {attempt + 1}): {response_json}") continue # Validate the response structure agent_response = AgentResponse(**response_dict) my_share, other_share = agent_response.my_share, agent_response.other_share # Validate that the values are within expected bounds if 0 <= my_share <= self.amount and 0 <= other_share <= self.amount and my_share + other_share <= self.amount: return agent_response.dict() else: print(f"Invalid response detected (Attempt {attempt + 1}): {response_dict}") except Exception as e: print(f"Error in Pagoda request (Attempt {attempt + 1}): {e}") raise ValueError("Pagoda model failed to provide a valid response after multiple attempts.") def apply_strategy(self) -> Dict: """Generates a response based on predefined strategies.""" if self.model == "gpt-4.5-preview-2025-02-27": my_share = int(0.7 * self.amount) # Example rule: keep 70% other_share = self.amount - my_share motivation = f"Using strategy from {self.model}, I chose to keep {my_share} and give {other_share}." return {"my_share": my_share, "other_share": other_share, "motivation": motivation} if self.model in ["llama3", "llama3.3:latest"]: my_share = self.amount // 2 other_share = self.amount - my_share motivation = "I'm being fair and generous!" agent_response = AgentResponse(my_share=my_share, other_share=other_share, motivation=motivation) return agent_response.dict() if self.model in ["mistral-small", "mixtral:8x7b"]: my_share = self.amount // 2 other_share = self.amount - my_share motivation = "The decision is to divide the money equally." agent_response = AgentResponse(my_share=my_share, other_share=other_share, motivation=motivation) return agent_response.dict() if self.model in ["deepseek-r1", "deepseek-r1:7b"]: half_amount = self.amount // 2 return { "my_share": half_amount, "other_share": half_amount, "motivation": "Split equally between both players." } # Run the async function and return the response if __name__ == "__main__": agent = Dictator(amount=100, model="qwen3", temperature=0.7, strategy=False) # "llama3.3:latest", "mixtral:8x7b" response_json = asyncio.run(agent.run()) print(response_json)