From ac07aa74529d0ecd01f89540bcb79af0ca634d21 Mon Sep 17 00:00:00 2001 From: Maxime MORGE <maxime.morge@univ-lille.fr> Date: Tue, 18 Mar 2025 17:09:22 +0100 Subject: [PATCH] Dictator Consistency First Try --- src/dictator/dictator_consistency.py | 172 +++++++++++++++++++++++++++ 1 file changed, 172 insertions(+) create mode 100644 src/dictator/dictator_consistency.py diff --git a/src/dictator/dictator_consistency.py b/src/dictator/dictator_consistency.py new file mode 100644 index 0000000..59ada6b --- /dev/null +++ b/src/dictator/dictator_consistency.py @@ -0,0 +1,172 @@ +import os +import asyncio +from typing import Dict, List +from pydantic import BaseModel +from autogen_agentchat.agents import AssistantAgent +from autogen_agentchat.messages import TextMessage +from autogen_core import CancellationToken +from autogen_ext.models.openai import OpenAIChatCompletionClient +import json +import random +import numpy as np +from scipy.optimize import minimize + +# Load API key from environment variable +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") +if not OPENAI_API_KEY: + raise ValueError("Missing OPENAI_API_KEY. Set it as an environment variable.") + +# Define the expected response format as a Pydantic model +class AgentResponse(BaseModel): + my_share: float + other_share: float + reasoning: str + +# The dictator game simulation class +class DictatorConsistency: + def __init__(self, model: str, temperature: float, random=False, max_retries: int = 3): + self.model = model + self.temperature = temperature + self.strategy = random + self.max_retries = max_retries # Maximum retry attempts in case of hallucinations + + if not random: + is_openai_model = model.startswith("gpt") + base_url = "https://api.openai.com/v1" if is_openai_model else "http://localhost:11434/v1" + + model_info = { + "temperature": self.temperature, + "function_calling": True, + "parallel_tool_calls": True, + "family": "unknown", + "json_output": True, + "vision": False + } + + self.model_client = OpenAIChatCompletionClient( + model=self.model, + base_url=base_url, + api_key=OPENAI_API_KEY, + model_info=model_info, + response_format=AgentResponse + ) + + async def run(self, m: float, n: float) -> Dict: + """Runs the model if strategy is False, otherwise uses a classical method.""" + if self.strategy: + return self.apply_random(m, n) + + instruction = f""" + You have $100 to allocate between yourself and an anonymous person. + - Allocating 1 point for yourself is worth {m} dollars. + - Allocating 1 point for the other person is worth {n} dollars. + Your goal is to decide how to allocate these $100. + + Example: + - If `m = 1` and `n = 2`, then for every point you allocate to yourself, you receive $1, and for every point you allocate to the other person, they receive $2. + - If you choose to allocate 30 points to yourself, then you would receive 30 * 1 = $30, and the other person would receive $100 - $30 = $70 . + - Thus, your total allocation would be 30 points for yourself and 70/2 = 35 points for the other person, totaling $100. + + + Please provide: + - How many points you want to allocate to yourself (out of $100), + - How many points you want to allocate to the other person. + + Your response should be in JSON format with `my_share`, `other_share`, and `reasoning`. + """ + + for attempt in range(self.max_retries): + agent = AssistantAgent( + name="Dictator", + model_client=self.model_client, + system_message="You are a helpful assistant. You will be given 25 rounds of decision-making tasks and will be responsible for making decisions. You should use your best judgment to come up with solutions that you like most." + ) + + response = await agent.on_messages( + [TextMessage(content=instruction, source="user")], + cancellation_token=CancellationToken(), + ) + + try: + response_data = response.chat_message.content + agent_response = AgentResponse.model_validate_json(response_data) # Parse JSON + my_share, other_share = agent_response.my_share, agent_response.other_share + print(f"Response (Attempt {attempt+1}): {response_data}") + # Validate values: ensure they sum to $100 considering the values M and N + if 0 <= my_share and 0 <= other_share : + total_value = my_share * m + other_share * n + if abs(total_value - 100.0) < 1e-6: # Use a tolerance for floating-point comparison + return agent_response.model_dump() + else: + print(f"Invalid response detected (Attempt {attempt+1}): Total value {total_value} != 100.") + else: + print(f"Invalid response detected (Attempt {attempt+1}): {response_data}") + except Exception as e: + print(f"Error parsing response (Attempt {attempt+1}): {e}") + + raise ValueError("Model failed to provide a valid response after multiple attempts.") + + def apply_random(self, m:int, n:int) -> Dict: + """Generates a response.""" + my_share = (random.uniform(0, 10)) + other_share = (100 - my_share * m) / n + # Validate values: ensure they sum to $100 considering the values M and N + if 0 <= my_share and 0 <= other_share: + total_value = my_share * m + other_share * n + if abs(total_value - 100.0) < 1e-6: # Use a tolerance for floating-point comparison + return { + "my_share": my_share, + "other_share": other_share, + "reasoning": "Random choice" + } + else: + return { + "my_share": my_share, + "other_share": other_share, + "reasoning": "choices outside budget" + + } + + async def run_rounds(self, nb_rounds: int) -> List[Dict]: + """Runs the dictator game for n rounds and returns the results.""" + results = [] + prices = [] + choices = [] + for _ in range(nb_rounds): + m = round(random.uniform(0.1, 1.0), 1) # Random price for self + n = round(random.uniform(0.1, 1.0), 1) # Random price for other + print(f"m, n: {m}, {n}") + result = await self.run(m, n) + print(f"result: {result}") + results.append(result) + prices.append([m, n]) + choices.append([result['my_share'], result['other_share']]) + + ccei_value = self.compute_ccei(prices, choices) + print(f"prices: {prices}") + print(f"choices: {choices}") + print(f"CCEI: {ccei_value}") + return results + + def compute_ccei(self, prices: List[List[float]], choices: List[List[float]]) -> float: + """Computes the Critical Cost Efficiency Index (CCEI).""" + def objective(lambda_val): + return lambda_val[0] + + constraints = [] + for t, (p_t, x_t) in enumerate(zip(prices, choices)): + for s, x_s in enumerate(choices): + if t != s: + constraints.append({ + 'type': 'ineq', + 'fun': lambda lambda_val, p_t=p_t, x_s=x_s, x_t=x_t: lambda_val[0] * np.dot(p_t, x_t) - np.dot(p_t, x_s) + }) + + result = minimize(objective, [1], constraints=constraints, bounds=[(0, 1)]) + return result.x[0] if result.success else 0 + +# Run the async function and return the response +if __name__ == "__main__": + game_agent = DictatorConsistency(model="mistral-small", temperature=0.7, random=True) # Toggle strategy here + responses = asyncio.run(game_agent.run_rounds(10)) + print(json.dumps(responses, indent=2)) -- GitLab