Source code for psychsim.world

from __future__ import print_function
import bz2
import copy
import os
import pickle
from typing import Optional
from xml.dom.minidom import Node, parseString

from psychsim.action import act2dict, Action, ActionSet
from psychsim.probability import Distribution
from psychsim.pwl.keys import makeFuture, modelKey
from psychsim.pwl import *
from psychsim.agent import Agent
import psychsim.graph
try:
    from psychsim.ui.diagram import Diagram
except:
    pass


[docs]class World(object): """ :ivar agents: table of agents in this world, indexed by name :type agents: strS{->}L{Agent} :ivar state: the distribution over states of the world :type state: ``psychsim.pwl.state.VectorDistributionSet`` :ivar variables: definitions of the domains of possible variables (state features, relationships, observations) :type variables: dict :ivar symbols: utility storage of symbols used across all enumerated state variables :type symbols: dict :ivar dynamics: table of action effect models :type dynamics: dict :ivar dependency: dependency structure among state features that impose temporal constraints :type dependency: ``psychsim.graph.DependencyGraph`` :ivar history: accumulated list of outcomes from simulation steps :type history: list :ivar termination: list of conditions under which the simulation terminates (default is none) :type termination: ``psychsim.pwl.tree.KeyedTree[]`` """ memory = False def __init__(self, xml=None,stateType=VectorDistributionSet): """ :param xml: Initialization argument, either an XML Element, or a filename :type xml: Node or str :param stateType: Class used for the world state """ self.agents = {} # State feature information if issubclass(stateType,KeyedVector): self.state = stateType({CONSTANT: 1}) elif issubclass(stateType,VectorDistribution): self.state = stateType({KeyedVector({CONSTANT: 1}): 1}) else: self.state = stateType() self.variables = {} self.locals = {} self.symbols = {} self.symbolList = [] self.termination = [] self.relations = {} # Turn order state info self.maxTurn = None self.turnKeys = set() # Action effect information self.dynamics = {} self.conditionalDynamics = {} self.newDynamics = {True: []} self.dependency = psychsim.graph.DependencyGraph(self) self.history = [] self.diagram = None self.extras = {} if isinstance(xml,Node): self.parse(xml) elif isinstance(xml,str): if xml[-4:] == '.xml': # Uncompressed f = open(xml,'r') else: if xml[-4:] != '.psy': xml = '%s.psy' % (xml) f = bz2.BZ2File(xml,'r') doc = parseString(f.read()) f.close() self.parse(doc.documentElement) self.parallel = False
[docs] def initialize(self): self.agents.clear() self.variables.clear() self.locals.clear() self.relations.clear() self.symbols.clear() del self.symbolList[:] self.dynamics.clear() self.dependency.clear() del self.history[:] del self.termination[:] self.state.clear()
[docs] def clearCoords(self): if self.diagram: self.diagram.clear() for variable in self.variables.values(): if 'xpre' in variable: del variable['xpre'] del variable['ypre'] del variable['xpost'] del variable['ypost']
[docs] def setParallel(self, flag=True): """ Turns on multiprocessing when agents have turns in parallel :param flag: multiprocessing is on iff C{True} (default is C{True}) :type flag: bool """ self.parallel = flag
"""------------------""" """Simulation methods""" """------------------"""
[docs] def step(self, actions=None, state=None, real=True, select=False, keySubset=None, horizon=None, tiebreak=None, updateBeliefs=True, debug={}, threshold=None, context='', max_k=None): """ The simulation method :param actions: optional argument setting a subset of actions to be performed in this turn :type actions: strS{->}L{ActionSet} :param state: optional initial state distribution (default is the current world state distribution) :type state: L{VectorDistribution} :param real: if C{True}, then modify the given state; otherwise, this is only hypothetical (default is C{True}) :type real: bool :param float threshold: Outcomes with a likelihood below this threshold are pruned (default is None, no pruning) """ if state is None: state = self.state # Check whether we are already in a terminal state if self.terminated(state): return state self.dependency.getEvaluation() if keySubset is None: keySubset = state.keys() if real is False: raise DeprecationWarning('If you want to do hypothetical reasoning, pass in a copy of the state.') prob = 1 actions = act2dict(actions) # Determine the actions taken by the agents in this world state, policies, choices = self.deltaAction(state, actions, horizon, tiebreak, keySubset, debug, context) # Compute the effect of the chosen actions effect = self.deltaState(choices,state,keySubset) # Update turn order effect.append(self.deltaTurn(state, policies)) for stage in effect: prob *= self.applyEffect(state, stage, select, max_k=max_k) state.make_certain() # The future becomes the present state.rollback() if updateBeliefs: # Update agent models included in the original world # (after finding out possible new worlds) if updateBeliefs is True: agents_modeled = [name for name in self.agents if modelKey(name) in state] else: agents_modeled = updateBeliefs for name in agents_modeled: key = modelKey(name) agent = self.agents[name] agent.updateBeliefs(state, policies, horizon=horizon, context=context) substate = state.keyMap[makeFuture(key)] if select and substate is not None: state.distributions[substate].select(select == 'max') # The future becomes the present state.rollback() if select: prob *= state.select(select == 'max') if threshold is not None: prob *= state.prune_probability(threshold) state.normalize() if self.memory: self.history.append(copy.deepcopy(state)) # self.modelGC(False) return prob
[docs] def deltaAction(self,state=None,actions=None,horizon=None,tiebreak=None,keySubset=None,debug={}, context=''): if state is None: state = self.state if keySubset is None: keySubset = state.keys() choices = {} policies = {} for name in self.agents: turn = keys.turnKey(name) if turn in keySubset: turns = state.domain(turn) if len(turns) != 1: raise ValueError('Unable to process uncertain turns. Come back later.') if 0 in turns: # This agent has a turn now action = keys.actionKey(name) if name in actions: # Translate any pre-specified actions into PWL policy if isinstance(actions[name], Action): actions[name] = ActionSet([actions[name]]) if isinstance(actions[name], ActionSet): choices[name] = [actions[name]] policies[name] = makeTree(setToConstantMatrix(action,actions[name])).desymbolize(self.symbols) else: policies[name] = actions[name] choices[name] = {m[makeFuture(action)][CONSTANT] for m in policies[name].leaves()} else: decision = self.agents[name].decide(state, horizon, actions, None, tiebreak, None, debug=debug.get(name, {}), context=context) if name in debug: debug[name]['__decision__'] = decision try: policies[name] = decision['policy'] choices[name] = {m[makeFuture(action)][CONSTANT] for m in policies[name].leaves()} policies[name] = policies[name].desymbolize(self.symbols) except KeyError: choices[name] = [decision['action']] policies[name] = makeTree(setToConstantMatrix(action,decision['action'])).desymbolize(self.symbols) elif name in actions: raise ValueError('Policy generated for %s out of turn' % (name)) if len(policies) == 0: self.printState(state) raise RuntimeError('Nobody has a turn!') for name,policy in policies.items(): state *= policy return state,policies,choices
[docs] def deltaState(self,actions,state,uncertain=False): """ Computes the change across a subset of state features """ # Figure out the order in which to update vector elements keyOrder = [] for keySet in self.dependency.getEvaluation(): if state is not self.state: keySet = [k for k in keySet if k in state] if len(keySet) > 0: keyOrder.append(keySet) effects = [] for keySet in keyOrder: dynamics = self.getActionEffects(actions,keySet) for key in keySet: if key not in dynamics: dynamics[key] = None effects.append(dynamics) return effects
[docs] def deltaTurn(self,state,actions=None): """ Computes the change in the turn order based on the given actions :param start: The original state :param end: The final state (which will be modified to reflect the new turn order) :type start,end: L{VectorDistributionSet} :returns: The dynamics functions applied to update the turn order """ turnKeys = {k for k in state.keys() if isTurnKey(k)} dynamics = {} for key in turnKeys: dynamics.update(self.getTurnDynamics(key,actions)) return dynamics
[docs] def applyEffect(self, state, effect, select=False, max_k: Optional[int] = None) -> float: if isinstance(select, dict): default_select = select.get('__default__', True) else: default_select = select prob = 1 if isinstance(effect, list): for stage in effect: prob *= self.applyEffect(state, stage, select) else: for key, dynamics in effect.items(): if dynamics is None: pass elif len(dynamics) == 1: tree = dynamics[0] # if select: # if select == 'max': # tree, subprob = tree.sample(True, state) # None if isinstance(state,VectorDistributionSet) else state) # elif select is True: # tree, subprob = tree.sample(False, state) # None if isinstance(state,VectorDistributionSet) else state) # elif default_select and key not in select: # # We are selecting a specific value, just not for this particular state feature # tree, subprob = tree.sample(False, state) # None if isinstance(state,VectorDistributionSet) else state) # else: # subprob = 1 # prob *= subprob for in_key in tree.getKeysIn(): if isFuture(in_key) and in_key not in state: state.copy_value(makePresent(in_key), in_key) try: state *= tree except StopIteration: self.printState(state) print(tree) raise RuntimeError except KeyError: print('Applying effect on %s' % (key)) print('Effect tree is\n%s' % (tree)) raise except ValueError: print('Applying effect on %s' % (key)) print('Effect tree is\n%s' % (tree)) raise else: cumulative = None for tree in dynamics: if cumulative is None: cumulative = copy.deepcopy(tree) else: cumulative.makeFuture([key]) cumulative *= tree tree = cumulative if select: if isinstance(state,VectorDistributionSet): state.__imul__(tree,select) else: if isinstance(tree,KeyedMatrix): state *= tree else: raise TypeError('Unable to generate selective effect from:\n%s' % (tree)) else: state *= tree if isinstance(select, dict) and key in select: if select[key] not in state.marginal(makeFuture(key)): value = self.float2value(key, select[key]) nonzero = ', '.join(['"%s"' % (self.float2value(key, el)) for el in state.marginal(makeFuture(key)).domain()]) raise ValueError(f'Selecting impossible value "{value}" for {key} (nonzero probability for {nonzero})') prob *= state.setitem(makeFuture(key), select[key]) if max_k is not None: prob *= state.prune_size(max_k) return prob
[docs] def addTermination(self,tree,action=True): """ Adds a possible termination condition to the list """ # Temporary deprecation check (TODO: Remove) remaining = [tree] while remaining: subtree = remaining.pop() if subtree.isLeaf(): if isinstance(subtree.children[None],bool): msg = 'Use set%sMatrix(psychsim.pwl.keys.TERMINATED) instead of %s' % \ (subtree.children[None],subtree.children[None]) raise DeprecationWarning(msg) elif subtree.isProbabilistic(): remaining += subtree.children.domain() else: remaining += subtree.children.values() try: dynamics = self.dynamics[TERMINATED] except KeyError: dynamics = self.dynamics[TERMINATED] = {} if action in dynamics and action is True: raise DeprecationWarning('Multiple termination conditions no longer supported. Please merge into single boolean PWL tree.') # Termination state info if not TERMINATED in self.variables: self.defineState(state2agent(TERMINATED), state2feature(TERMINATED), bool, description="True if and only if a termination condition for this simulation is satisfied") self.setFeature(TERMINATED, False) self.setDynamics(TERMINATED, action, tree)
[docs] def terminated(self,state=None): """ Evaluates world states with respect to termination conditions :param state: the state vector (or distribution thereof) to evaluate (default is the current world state) :type state: L{psychsim.pwl.KeyedVector} or L{VectorDistribution} :returns: C{True} iff the given state (or all possible worlds if a distribution) satisfies at least one termination condition :rtype: bool """ if state is None: state = self.state if not TERMINATED in state: return False termination = self.getValue(TERMINATED,state) if isinstance(termination,Distribution): termination = termination[True] == 1. return termination
"""-----------------""" """Authoring methods""" """-----------------"""
[docs] def addAgent(self, agent, setModel=True, avoid_beliefs=True): return self.add_agent(agent, setModel, avoid_beliefs)
[docs] def add_agent(self, agent, setModel=True, avoid_beliefs=True): if isinstance(agent, str): agent = Agent(agent) if self.has_agent(agent): raise NameError(f'Agent {agent.name} already exists in this world') else: self.agents[agent.name] = agent agent.world = self key = modelKey(agent.name) if key not in self.variables: self.defineVariable(key, list, list(agent.models.keys()), avoid_beliefs=avoid_beliefs) if len(agent.models) == 0: # Default model settings agent.addModel('%s0' % (agent.name), R=None, horizon=2, level=2, rationality=1., discount=1., selection='consistent', beliefs=True, parent=None, projector=Distribution.expectation) if setModel: if isinstance(self.state, VectorDistributionSet): # Initialize model of this agent to be uniform distribution (got a better idea?) prob = 1./float(len(agent.models)) dist = {model: prob for model in agent.models} self.setModel(agent.name, dist) else: assert len(agent.models) == 1 self.setModel(agent.name, next(iter(agent.models.keys()))) return agent
[docs] def has_agent(self,agent): """ :param agent: The agent (or agent name) to look for :type agent: L{Agent} or str :returns: C{True} iff this C{World} already has an agent with the same name :rtype: bool """ if isinstance(agent,str): return agent in self.agents else: return agent.name in self.agents
[docs] def setTurnDynamics(self,name,action,tree): """ Convenience method for setting custom dynamics for the turn order :param name: the name of the agent whose turn dynamics are being set :type name: str :param action: the action affecting the turn order :type action: L{Action} or L{ActionSet} :param tree: the decision tree defining the effect on this agent's turn order :type tree: L{psychsim.pwl.KeyedTree} """ if self.maxTurn is None: raise ValueError('Call setOrder before setting turn dynamics') key = turnKey(name) if not key in self.variables: self.defineVariable(key,int,hi=self.maxTurn,evaluate=False) self.setDynamics(key,action,tree)
[docs] def addDynamics(self,tree,action=True,enforceMin=False,enforceMax=False): if isinstance(action,Action): action = ActionSet(action) assert action is True or isinstance(action,ActionSet),'Action must be True/ActionSet/Action for addDynamics' if not action in self.newDynamics: self.newDynamics[action] = [] tree = tree.desymbolize(self.symbols) keysIn = tree.getKeysIn() keysOut = tree.getKeysOut()
[docs] def setDynamics(self,key,action,tree,enforceMin=False,enforceMax=False,codePtr=False): """ Defines the effect of an action on a given state feature :param key: the key of the affected state feature :type key: str :param action: the action affecting the state feature :type action: L{Action} or L{ActionSet} :param tree: the decision tree defining the effect :type tree: L{psychsim.pwl.KeyedTree} :param codePtr: if C{True}, tags the dynamics with a pointer to the module and line number where the tree is defined :type codePtr: bool """ # logging.warning('setDynamics will soon be deprecated. Please migrate to using addDynamics instead.') if isinstance(action,str): raise TypeError('Incorrect action type in setDynamics call, perhaps due to change in method definition. Please use a key string as the first argument, rather than the more limiting entity/feature combination.') if isinstance(tree,dict): raise TypeError('Tree passed in to setDynamics is a dictionary. Perhaps you forgot to call makeTree first?') if not isinstance(action,ActionSet) and not action is True: if not isinstance(action,Action): # dict -> Action action = Action(action) # Action -> ActionSet action = ActionSet([action]) assert key in self.variables,'No state element "%s"' % (key) # if not action is True: # for atom in action: # assert atom['subject'] in self.agents,\ # 'Unknown actor %s' % (atom['subject']) # assert self.agents[atom['subject']].hasAction(atom),\ # 'Unknown action %s' % (atom) if not key in self.dynamics: self.dynamics[key] = {} if action not in self.dynamics: self.dynamics[action] = {} # if action is not True and len(action) == 1 and next(iter(action)) not in self.dynamics: # self.dynamics[next(iter(action))] = {} # Translate symbolic names into numeric values tree = tree.desymbolize(self.symbols) if enforceMin and self.variables[key]['domain'] in [int,float]: # Modify tree to enforce floor tree.floor(key,self.variables[key]['lo']) if enforceMax and self.variables[key]['domain'] in [int,float]: # Modify tree to enforce ceiling tree.ceil(key,self.variables[key]['hi']) self.dynamics[key][action] = tree self.dynamics[action][key] = tree # if action is not True and len(action) == 1: # self.dynamics[next(iter(action))][key] = tree if codePtr: import inspect frame = inspect.getouterframes(inspect.currentframe())[1] try: fname = frame.filename except AttributeError: fname = frame[1] mod = os.path.relpath(fname, os.path.abspath(os.path.join(os.path.dirname(__file__),'..'))) try: self.extras['%s %s' % (key,action)] = '%s:%d' % (mod,frame.lineno) except AttributeError: self.extras['%s %s' % (key,action)] = '%s:%d' % (mod,frame[2])
[docs] def getDynamics(self,key,action,state=None): if state is not None: raise DeprecationWarning('There are no longer different dynamics functions depending on the state') if not key in self.dynamics: return [] if isinstance(action,dict): # Table of actions by multiple agents return self.getDynamics(key,ActionSet(action),state) error = None try: return [self.dynamics[key][action]] except KeyError: error = 'key' except TypeError: error = 'type' if error: dynamics = [] for atom in action: try: tree = self.dynamics[key][ActionSet([atom])] dynamics.append(tree) self.dynamics[key][atom] = tree except KeyError: if len(atom) > len(atom.special): # Extra parameters try: tree = self.dynamics[key][ActionSet([atom.root()])] except KeyError: tree = None if tree: table = {} for field in atom.getParameters(): table[actionFieldKey(field)] = atom[field] dynamics.append(tree.desymbolize(table)) if len(dynamics) == 0: # See whether there are key patterns that match this action for root,tree in self.dynamics[key].items(): if isinstance(root,ActionSet) and len(root) == 1: if atom.match(next(iter(root))): dynamics.append(tree) if len(dynamics) == 0: # No action-specific dynamics, fall back to default dynamics if True in self.dynamics[key]: dynamics.append(self.dynamics[key][True]) return dynamics
[docs] def addActionEffects(self): """ For backward compatibility with scenarios that didn't do this from the beginning """ for key,table in list(self.dynamics.items()): for action,dynamics in table.items(): if action not in self.dynamics: self.dynamics[action] = {} self.dynamics[action][key] = dynamics if action is not True and len(action) == 1: atom = next(iter(action)) if atom not in self.dynamics: self.dynamics[atom] = {} self.dynamics[atom][key] = dynamics
[docs] def getActionEffects(self,joint,keySet,dynamics=None): """ :param uncertain: True iff there is uncertainty about which actions will be performed """ if dynamics is None: dynamics = {} if isinstance(joint,Action): return self.getActionEffects(ActionSet(joint),keySet,dynamics) elif isinstance(joint,ActionSet): for key,tree in self.dynamics[joint].items(): if key in keySet: try: dynamics[key].append(tree) except KeyError: dynamics[key] = [tree] if len(joint) > 1: for action in joint: self.getActionEffects(ActionSet(action),keySet,dynamics) elif isinstance(joint,dict): for name,actions in joint.items(): if len(actions) == 1: # Single action choice self.getActionEffects(next(iter(actions)),keySet,dynamics) else: # Multiple possible actions trees = {} subjectNow = actionKey(next(iter(actions))['subject']) subject = makeFuture(subjectNow) for action in actions: partial = self.getActionEffects(ActionSet(action),keySet) for key,subtree in partial.items(): assert len(subtree) == 1,'Unable to merge concurrent effects of %s on %s' % (action,key) try: trees[key][action] = subtree[0] except KeyError: trees[key] = {action: subtree[0]} for key,branches in trees.items(): values = [] tree = {} for action,subtree in branches.items(): value = self.value2float(subjectNow,action) tree[len(values)] = subtree values.append(value) tree['if'] = equalRow(subject,values) if len(branches) < len(actions): # Need an else branch try: tree[None] = self.dynamics[True][key] except KeyError: tree[None] = noChangeMatrix(key) tree = makeTree(tree) try: dynamics[key].append(tree) except KeyError: dynamics[key] = [tree] # Look for "universal" dynamics for any state feature with no other dynamics for key,tree in self.dynamics.get(True,{}).items(): if key in keySet and key not in dynamics: dynamics[key] = [tree] else: raise TypeError('Unknown type of action specification: %s' % (joint.__class__.__name__)) return dynamics
[docs] def getConditionalDynamics(self,action,key,tree=None): if action not in self.conditionalDynamics: self.conditionalDynamics[action] = {} if key not in self.conditionalDynamics[action]: newTree = makeTree({'if': equalRow(actionKey(action['subject'],True),ActionSet(action)), True: copy.deepcopy(tree), False: noChangeMatrix(key)}) self.conditionalDynamics[action][key] = newTree.desymbolize(self.symbols) return self.conditionalDynamics[action][key]
[docs] def getAncestors(self,keySubset,actions): """ :returns: a set of keys that potentially influence at least one key in the given set of keys (including this set as well) """ remaining = set(keySubset) result = set() while remaining: key = remaining.pop() result.add(key) dynamics = self.getDynamics(key,actions) if dynamics: for tree in dynamics: remaining |= tree.getKeysIn() - result - {CONSTANT} return result
"""------------------""" """Turn order methods""" """------------------"""
[docs] def setOrder(self, order): """ Equivalent to the more pythonic set_order """ self.set_order(order)
[docs] def set_order(self, order): """ Initializes the turn order to the given order :param order: the turn order, as a list of names (each agent acts in sequence) or a list of sets of names (agents within a set acts in parallel) :type order: str[] or {str}[] """ self.maxTurn = len(order) - 1 for index in range(len(order)): if isinstance(order[index], set): names = order[index] else: names = [order[index]] for name in names: # Insert turn key key = turnKey(name) self.turnKeys.add(key) if key not in self.variables: self.defineVariable(key, int, hi=self.maxTurn) self.setFeature(key, index) # Insert action key key = stateKey(name, keys.ACTION) if key not in self.variables: self.defineVariable(key, ActionSet, description='Action performed by %s' % (name)) if len(self.agents[name].actions) == 0: raise ValueError(f'Agent {name} is included in turn order, but has no actions defined') self.setFeature(key, next(iter(self.variables[key]['elements'])))
[docs] def setAllParallel(self): """ Utility method that sets the order to be all agents (who have actions) acting in parallel """ self.setOrder([{name for name,agent in self.agents.items() if agent.actions}])
[docs] def next(self,vector=None): """ :returns: a list of agents (by name) whose turn it is in the current epoch :rtype: str[] """ if vector is None: vector = self.state if len(self.turnKeys) == 0 and vector is self.state: self.turnKeys = {key for key in vector.keys() if isTurnKey(key)} if isinstance(vector,VectorDistributionSet): agents = set() for key in self.turnKeys: if key in vector.keyMap: substate = vector.keyMap[key] if substate is None: value = vector.certain[key] else: subvector = vector.distributions[substate] if len(subvector) == 1: value = subvector.first()[key] else: dist = subvector.marginal(key) assert len(dist) == 1,'World.next() does not operate on uncertain turns:\n%s' % (dist) value = dist.first() if value == 0: agents.add(turn2name(key)) return agents elif isinstance(vector,VectorDistribution): items = [] for key in self.turnKeys: if key in vector.keys(): dist = vector.marginal(key) assert len(dist) == 1,'World.next() does not operate on uncertain turns:\n%s' % (dist) items.append((key,dist.first())) else: items = [i for i in vector.items() if isTurnKey(i[0])] if len(items) == 0: # No turn information in vector return [] value = min(map(lambda i: int(i[1]),items)) return map(lambda i: turn2name(i[0]),filter(lambda i: int(i[1]) == value,items))
[docs] def deltaOrder(self,actions,vector): """ .. warning:: assumes that no one is acting out of turn :returns: the new turn sequence resulting from the performance of the given actions """ potentials = [name for name in self.agents.keys() if turnKey(name) in vector] if len(potentials) == 0: return None if self.maxTurn is None: self.maxTurn = max([vector[turnKey(name)] for name in potentials]) # Figure out who has acted if isinstance(actions,ActionSet): table = {} for atom in actions: try: table[atom['subject']] = ActionSet(list(table[atom['subject']])+[atom]) except KeyError: table[atom['subject']] = ActionSet(atom) elif isinstance(actions,dict): table = actions actions = ActionSet() for atom in table.values(): actions = actions | atom else: assert isinstance(actions,list) actionList = actions table = {} actions = set() for atom in actionList: table[atom['subject']] = True actions.add(atom) actions = ActionSet(actions) # Find dynamics for each turn delta = psychsim.pwl.KeyedMatrix() for name in potentials: key = turnKey(name) dynamics = self.getTurnDynamics(key,table) # Combine any turn dynamics into single matrix matrix = dynamics[0][vector] assert isinstance(matrix,psychsim.pwl.KeyedMatrix),'Dynamics must be deterministic' delta.update(matrix) return delta
[docs] def getTurnDynamics(self,key,actions): dynamics = self.getDynamics(key,actions) if len(dynamics) == 0: # Create default dynamics agent = turn2name(key) if agent in actions: # This agent took a turn; go to the end of the line tree = psychsim.pwl.makeTree(psychsim.pwl.setToConstantMatrix(key,self.maxTurn)) else: tree = psychsim.pwl.makeTree(psychsim.pwl.incrementMatrix(key,-1)) # self.setTurnDynamics(name,actions,tree) dynamics = [tree] return {key: dynamics}
[docs] def getActions(self,vector,agents=None,actions=None): """ :returns: the set of all possible action combinations that could happen in the given state """ if agents is None: agents = self.next(vector) if actions is None: actions = set([ActionSet()]) if len(agents) > 0: newActions = set() name = agents.pop() for subset in actions: for action in self.agents[name].getLegalActions(vector): newActions.add(subset | action) return self.getActions(vector,agents,newActions) else: return actions
[docs] def rotateTurn(self,name,state=None): """ Changes the given state vector so that the named agent is up next, preserving the current turn sequence """ if state is None: state = self.state keys = {k for k in state.keys() if isTurnKey(k)} sub = state.substate(keys) if len(sub) > 1: sub = state.merge(sub) else: sub = next(iter(sub)) dist = state.distributions[sub] assert len(dist) == 1,'Currently unable to handle uncertain turn state' vector = dist.first() del dist[vector] hi = max(vector.values()) delta = vector[turnKey(name)] for key,old in vector.items(): if old >= delta: vector[key] = old - delta else: vector[key] = hi + old - delta + 1 dist[vector] = 1.
"""-------------""" """State methods""" """-------------"""
[docs] def defineVariable(self,key,domain=float,lo=0.,hi=1.,description=None, combinator=None,codePtr=False, avoid_beliefs=True): """ Define the type and domain of a given element of the state vector :param key: string label for the column being defined :type key: str :param domain: the domain of values for this feature. Acceptable values are: - float: continuous range - int: discrete numeric range - bool: True/False value - list: enumerated set of discrete values - ActionSet: enumerated set of actions, of the named agent (as key) :type domain: class :param lo: for float/int features, the lowest possible value. for list features, a list of possible values. :type lo: float/int/list :param hi: for float/int features, the highest possible value :type hi: float/int :param description: optional text description explaining what this state feature means :type description: str :param combinator: how should multiple dynamics for this variable be combined """ if avoid_beliefs: for agent in self.agents.values(): for model in agent.models.values(): if 'beliefs' in model and not model['beliefs'] is True: raise RuntimeError('Define all variables before setting beliefs (%s:%s)' \ % (agent.name,model['name'])) if key in self.variables: raise NameError('Variable %s already defined' % (key)) if key[-1] == "'": raise ValueError('Ending single-quote reserved for indicating future state') self.variables[key] = {'domain': domain, 'description': description, 'combinator': combinator} if domain is float: self.variables[key].update({'lo': lo,'hi': hi}) elif domain is int: self.variables[key].update({'lo': int(lo),'hi': None if hi is None else int(hi)}) elif domain is list or domain is set: assert isinstance(lo,list) or isinstance(lo,set),\ 'Please provide set/list of elements for features of the set/list type' self.variables[key].update({'elements': lo,'lo': None,'hi': None}) for element in lo: if element not in self.symbols: self.symbols[element] = len(self.symbols) self.symbolList.append(element) elif domain is bool: self.variables[key].update({'lo': 0,'hi': 1}) elif domain is ActionSet: # The actions of an agent if isinstance(lo,float): if key in self.agents: lo = self.agents[key].actions else: lo = self.agents[keys.state2agent(key)].actions if description is None: description = '; '.join([', '.join(['%s: %s' % (act,act.description) \ for act in actSet]) for actSet in lo]) self.variables[key].update({'elements': lo,'lo': None,'hi': None, 'description': description}) for action in lo: if action not in self.symbols: self.symbols[action] = len(self.symbols) self.symbolList.append(action) else: raise ValueError('Unknown domain type %s for %s' % (domain,key)) self.variables[key]['key'] = key self.dependency.clear() if codePtr: if codePtr is True: import inspect for frame in inspect.getouterframes(inspect.currentframe()): try: fname = frame.filename except AttributeError: fname = frame[1] if fname != __file__: break else: frame = codePtr mod = os.path.relpath(frame.filename, os.path.abspath(os.path.join(os.path.dirname(__file__),'..'))) try: self.extras[key] = '%s:%d' % (mod,frame.lineno) except AttributeError: self.extras[key] = '%s:%d' % (mod,frame[2])
[docs] def setFeature(self, key, value, state=None, noclobber=False, recurse=False): self.set_feature(key, value, state, noclobber, recurse)
[docs] def set_feature(self, key, value, state=None, noclobber=False, recurse=False): """ Set the value of an individual element of the state vector :param key: the label of the element to set :type key: str :type value: float or L{psychsim.probability.Distribution} :param state: the state distribution to modify (default is the current world state) :type state: L{VectorDistribution} :param recurse: if True, set this feature to the given value for all agents' beliefs (and beliefs of beliefs, etc.) """ assert key in self.variables, 'Unknown element "%s"' % (key) # if state is None or state is self.state: # for agent in self.agents.values(): # for model in agent.models.values(): # if 'beliefs' in model and not model['beliefs'] is True and \ # not key in model['beliefs']: # raise RuntimeError('Set all variable values before setting beliefs') if state is None: state = self.state if isinstance(state, VectorDistributionSet): if noclobber: # Posterior update using existing distribution if isinstance(value, Distribution): raise TypeError('Unable to set posterior distribution on %s within joint distribution over %s' % (key, ', '.join(sorted({key for key in state.subDistribution(key).keys() if key != CONSTANT})))) else: state[key] = self.value2float(key, value) else: # Set new value for this feature state.join(key, self.value2float(key, value)) elif isinstance(state, VectorDistribution): state.join(key, self.value2float(key, value)) else: assert not isinstance(value, Distribution) state[key] = self.value2float(key, value) if recurse: for name, models in self.get_current_models(state).items(): inconsistent = [] for model in models: beliefs = self.agents[name].models[model].get('beliefs', True) if beliefs is not True and key in beliefs: if noclobber: try: self.setFeature(key, value, beliefs, noclobber) except ValueError: # This model cannot believe this value to be possible inconsistent.append(model) else: self.setFeature(key, value, beliefs, noclobber) if inconsistent: state.delete_value(modelKey(name), set(inconsistent))
[docs] def setJoint(self,distribution,state=None): """ Sets the state for a combination of state features :param distribution: The joint distribution to join to the current state :type distribution: VectorDistribution :raises ValueError: if joint is over features already present in state :raises ValueError: if joint is not over at least two features """ keys = distribution.keys() if len(keys) < 2: raise ValueError('Use setFeature if not setting the value for multiple features') if state is None: state = self.state for key in keys: if key in state: sub = state.distributions[state.keyMap[key]] if len(sub) == 1: # Certain sub.deleteKey(key) if len(sub.keys()) <= 1: # Empty del state.distributions[state.keyMap[key]] del state.keyMap[key] else: raise ValueError('Unable to extricate pre-existing distribution for %s' % (key)) substate = 0 while substate in state.distributions: substate += 1 for key in keys: if key != CONSTANT: state.keyMap[key] = substate value = distribution.__class__() for vec in distribution.domain(): value[vec.__class__({key: self.value2float(key,vec[key]) for key in vec})] = distribution[vec] if CONSTANT not in keys: value.join(CONSTANT,1.) state.distributions[substate] = value return substate
[docs] def encodeVariable(self,key,value): raise DeprecationWarning('Use value2float method instead')
[docs] def float2value(self,key,flt): if isFuture(key): key = makePresent(key) if isinstance(flt,psychsim.probability.Distribution): # Decode each element return flt.__class__([(self.float2value(key, element), prob) for element, prob in flt.items()]) elif isinstance(flt,set): return {self.float2value(key,element) for element in flt} elif self.variables[key]['domain'] is bool: if flt > 0.5: return True else: return False elif self.variables[key]['domain'] is list or \ self.variables[key]['domain'] is set or \ self.variables[key]['domain'] is ActionSet: index = int(round(flt)) return self.symbolList[index] elif self.variables[key]['domain'] is int: return int(flt) # elif isModelKey(key): # return self.agents[model2name(key)].index2model(flt) else: return flt
[docs] def value2float(self,key,value): """ :returns: the float value (appropriate for storing in a L{psychsim.pwl.KeyedVector}) corresponding to the given (possibly symbolic, bool, etc.) value """ if isinstance(value,psychsim.probability.Distribution): # Encode each element return value.__class__([(self.value2float(key, element), prob) for element, prob in value.items()]) elif self.variables[key]['domain'] is bool: if value: return 1. else: return 0. # elif isModelKey(key): # return self.agents[model2name(key)].model2index(value) elif self.variables[key]['domain'] is list or self.variables[key]['domain'] is set or \ self.variables[key]['domain'] is ActionSet: return self.symbols[value] else: return value
[docs] def getFeature(self,key,state=None,unique=False): """ :param key: the label of the state element of interest :type key: str :param state: the distribution over possible worlds (default is the current world state) :type state: L{VectorDistribution} :returns: a distribution over values for the given feature :rtype: L{psychsim.probability.Distribution} """ if state is None: state = self.state assert key in self.variables or makePresent(key) in self.variables,'Unknown element "%s"' % (key) if isinstance(state, KeyedVector): return self.float2value(key,state[key]) else: marginal = state.marginal(key) if unique: assert len(marginal) == 1, 'Unique value requested for %s, but number of values is %d' % (key, len(marginal)) return self.float2value(key, marginal).first() else: return self.float2value(key, marginal)
[docs] def getValue(self,key,state=None): """ Helper method that returns a single value from a vector or a singleton distribution :param key: the label of the state element of interest :type key: str :param state: the distribution over possible worlds (default is the current world state) :type state: L{VectorDistribution} or L{psychsim.pwl.KeyedVector} :returns: a single value for the given feature """ if isinstance(state,psychsim.pwl.KeyedVector): return self.float2value(key,state[key]) else: marginal = self.getFeature(key,state) assert len(marginal) == 1,'getValue operates on only singleton distributions' return marginal.first()
[docs] def decodeVariable(self,key,distribution): raise DeprecationWarning('Use float2value method instead')
[docs] def defineState(self,entity,feature,domain=float,lo=0.,hi=1.,description=None,combinator=None, codePtr=False): return self.define_state(entity, feature, domain, lo, hi, description, combinator, codePtr)
[docs] def define_state(self, entity, feature, domain=float, lo=0, hi=1, description=None, combinator=None, codePtr=False): """ Defines a state feature associated with a single agent, or with the global world state. :param entity: if C{None}, the given feature is on the global world state; otherwise, it is local to the named agent :type entity: str """ if isinstance(entity,Agent): entity = entity.name key = stateKey(entity,feature) try: self.locals[entity][feature] = key except KeyError: self.locals[entity] = {feature: key} if not domain is None: # Haven't defined this feature yet self.defineVariable(key,domain,lo,hi,description,combinator, codePtr) return key
[docs] def setState(self, entity, feature, value, state=None, noclobber=False, recurse=False): """ For backward compatibility :param entity: the name of the entity whose state feature we're setting (does not have to be an agent) :type entity: str :type feature: str :param recurse: if True, set this feature to the given value for all agents' beliefs (and beliefs of beliefs, etc.) """ self.setFeature(stateKey(entity, feature), value, state, noclobber, recurse)
[docs] def getState(self,entity,feature,state=None,unique=False): """ For backward compatibility :param entity: the name of the entity of interest (C{None} if the feature of interest is of the world itself) :type entity: str :param feature: the state feature of interest :type feature: str :param unique: assume there is a unique true value and return it (not a Distribution) """ return self.getFeature(stateKey(entity,feature),state,unique)
[docs] def getAction(self,name=None,state=None,unique=False): """ :return: the C{ActionSet} last performed by the given entity """ return self.getFeature(actionKey(name),state,unique)
[docs] def defineRelation(self,subj,obj,name,domain=float,lo=0.,hi=1.,**kwargs): """ Defines a binary relationship between two agents :param subj: one of the agents in the relation (if a directed link, it is the "origin" of the edge) :type subj: str :param obj: one of the agents in the relation (if a directed link, it is the "destination" of the edge) :type obj: str :param name: the name of the relation (e.g., the verb to use between the subject and object) :type name: str """ key = binaryKey(subj,obj,name) try: self.relations[name][key] = {'subject': subj,'object': obj} except KeyError: self.relations[name] = {key: {'subject': subj,'object': obj}} if not domain is None: # Haven't defined this feature yet self.defineVariable(key,domain,lo,hi,**kwargs) return key
"""------------------""" """Mental model methods""" """------------------"""
[docs] def getModel(self, modelee, state=None, unique=False): """ :returns: the name of the model of the given agent indicated by the given state vector. If the given agent is a list, descends down the recursive beliefs to return the model at the bottom of that recursion :type modelee: str or str[] :type state: L{psychsim.pwl.state.VectorDistributionSet} :rtype: str """ if state is None: state = self.state if isinstance(modelee, list): model = self.getModel(modelee[0], state, unique) if len(modelee) > 1: if unique: if isinstance(modelee[0], str): return self.getModel(modelee[1:], self.agents[modelee[0]].getBelief(state, model), unique) else: return self.getModel(modelee[1:], self.agents[modelee[0].name].getBelief(state, model), unique) else: raise NotImplementedError('I am currently able to extract recursive mental models only when the result has no uncertainty. In this case, set unique flag to True.') else: return model elif isinstance(modelee, str): return self.getFeature(modelKey(modelee), state, unique) else: # Assume Agent instance return self.getFeature(modelKey(modelee.name), state, unique)
[docs] def get_current_models(self, state=None, cycle_check=False, all_models=None, tree=None, recurse=True): if state is None: state = self.state if all_models is None: all_models = set() result = {} for key in state.keys(): if isModelKey(key): name = state2agent(key) models = set(self.getFeature(key, state).domain()) if tree is not None: tree[name] = {model: {} for model in models} cycles = models & all_models if cycle_check and cycles: raise ValueError('Cycle in beliefs for models: {}'.format(', '.join(sorted(cycles)))) all_models |= models result[name] = result.get(name, set()) | models if recurse: for model in models - cycles: if self.agents[name].models[model].get('beliefs', True) is not True: beliefs = self.agents[name].getBelief(model=model) new_models = self.get_current_models(beliefs, cycle_check, all_models, tree if tree is None else tree[name][model]) for sub_name, sub_models in new_models.items(): if cycle_check: cycles = sub_models & all_models if cycles: raise ValueError('Cycle in beliefs for models: {}'.format(', '.join(sorted(cycles)))) all_models |= sub_models result[sub_name] = result.get(sub_name, set()) | sub_models return result
[docs] def getMentalModel(self,modelee,vector): raise DeprecationWarning('Substitute getModel instead (sorry for pedanticism, but a "model" may be real, not "mental")')
[docs] def setModel(self, modelee, distribution, state=None, model=None): # Make sure distribution is probability distribution over floats if state is None: state = self.state if isinstance(state, VectorDistributionSet): if not isinstance(distribution, dict): distribution = {distribution: 1.} if not isinstance(distribution, psychsim.probability.Distribution): distribution = psychsim.probability.Distribution(distribution) key = modelKey(modelee) if isinstance(state, str): # This is the name of the modeling agent (*cough* hack *cough*) self.agents[state].setBelief(key, distribution, model) else: # Otherwise, assume we're changing the model in the current state self.setFeature(key, distribution, state)
[docs] def setMentalModel(self,modeler,modelee,distribution,model=None): """ Sets the distribution over mental models one agent has of another entity @note: Normalizes the distribution given """ self.setModel(modelee,distribution,modeler,model)
[docs] def pruneModels(self,vector): """ Do you want a version of a possible world *without* all the fuss of agent models? Then *this* is the method for you! """ return psychsim.pwl.KeyedVector({key: vector[key] for key in vector.keys() if not isModelKey(key)})
[docs] def modelGC(self,check=False): """ Garbage collect orphaned models. """ for name, active_models in self.get_current_models().items(): agent = self.agents[name] parents = set(active_models) while parents: parents = {agent.models[model]['parent'] for model in parents} - {None} active_models |= parents for model in list(agent.models.keys()): if model not in active_models: del agent.models[model]
[docs] def updateModels(self,outcome,vector): for agent in self.agents.values(): label = self.getModel(agent.name,vector) model = agent.models[label] if not model['beliefs'] is True: omega = agent.observe(vector,outcome['actions']) beliefs = model['beliefs'] if not omega is True: raise NotImplementedError('Unable to update mental models under partial observability') for actor,actions in outcome['actions'].items(): # Consider each agent who *did* act actorKey = modelKey(actor) if beliefs.hasColumn(actorKey): # Agent has uncertain beliefs about this actor belief = beliefs.marginal(actorKey) prob = {} for index in belief.domain(): # Consider the hypothesis mental models of this actor hypothesis = self.agents[actor].models[self.agents[actor].index2model(index)] denominator = 0. V = {} state = psychsim.pwl.KeyedVector(outcome['old']) state[actorKey] = index for alternative in self.agents[actor].getLegalActions(outcome['old']): # Evaluate all available actions with respect to the hypothesized mental model V[alternative] = self.agents[actor].value(state,alternative,model=hypothesis['name'])['V'] if not actions in V: # Agent performed a non-prescribed action V[actions] = self.agents[actor].value(state,alternative,model=hypothesis['name'])['V'] # Convert into probability distribution of observed action given hypothesized mental model behavior = psychsim.probability.Distribution(V,hypothesis['rationality']) prob[index] = behavior[actions] # Bayes' rule prob[index] *= belief[index] # Update posterior beliefs over mental models prob = psychsim.probability.Distribution(prob) prob.normalize() belief = MatrixDistribution() for element in prob.domain(): belief[setToConstantMatrix(actorKey,element)] = prob[element] model['beliefs'].update(belief)
[docs] def scaleState(self,vector): """ Normalizes the given state vector so that all elements occur in [0,1] :param vector: the vector to normalize :type vector: L{psychsim.pwl.KeyedVector} :returns: the normalized vector :rtype: L{psychsim.pwl.KeyedVector} """ result = vector.__class__() remaining = dict(vector) # Handle defined state features for key,entry in self.variables.items(): if key in remaining: new = scaleValue(remaining[key],entry) result[key] = new del remaining[key] for name in self.agents.keys(): # Handle turns key = turnKey(name) if key in remaining: result[key] = remaining[key] / len(self.agents) del remaining[key] # Handle models key = modelKey(name) if key in remaining: result[key] = remaining[key] / len(self.agents[name].models) del remaining[key] # Handle constant term if CONSTANT in remaining: result[CONSTANT] = remaining[CONSTANT] del remaining[CONSTANT] if remaining: raise NameError('Unprocessed keys: %s' % (remaining.keys())) return result
[docs] def reachable(self,state=None,transition=None,horizon=-1,ignore=[],debug=False): """ @note: The C{__predecessors__} entry for each reachable vector is a set of possible preceding states (i.e., those whose value must be updated if the value of this vector changes :returns: transition matrix among states reachable from the given state (default is current state) :rtype: psychsim.pwl.KeyedVectorS{->}ActionSetS{->}VectorDistribution """ envelope = set() transition = {} if state is None: # Initialize with current state state = self.state[None] if isinstance(state,psychsim.pwl.VectorDistribution): for vector in state.domain(): envelope.add((vector,horizon)) else: # Initialize with given state envelope.add((state,horizon)) while len(envelope) > 0: vector,horizon = envelope.pop() assert len(vector) == len(state.domain()[0]) if debug: print('Expanding...') self.printVector(vector) node = vector.filter(ignore) # If no entry yet, then this is a start node if not node in transition: transition[node] = {'__predecessors__': set()} # Process next steps from this state if not self.terminated(vector) and horizon != 0: for actions in self.getActions(vector): if debug: print('Performing:', actions) future = self.stepFromState(vector,actions)['new'] if isinstance(future,psychsim.pwl.KeyedVector): future = psychsim.pwl.VectorDistribution({future: 1.}) transition[node][actions] = psychsim.pwl.VectorDistribution() for newVector in future.domain(): if debug: print('Result (P=%f)' % (future[newVector])) self.printVector(newVector) newNode = newVector.filter(ignore) transition[node][actions][newNode] = future[newVector] if newNode in transition: transition[newNode]['__predecessors__'].add(node) else: envelope.add((newNode,horizon-1)) transition[newNode] = {'__predecessors__': set([node])} return transition
[docs] def nearestVector(self,vector,vectors): mapping = {} for candidate in vectors: mapping[self.scaleState(candidate)] = candidate return mapping[self.scaleState(vector).nearestNeighbor(mapping.keys())]
[docs] def getDescription(self,key,feature=None): if not feature is None: raise DeprecationWarning('Use key when calling getDescription, not entity/feature combination.') return self.variables[key]['description']
[docs] def audit(self): """ Pre-flight simulation check """ errors = [] for agent in self.agents.values(): for model in agent.models.values(): if 'beliefs' in model: # Verify that I have correct beliefs about myself if modelKey(agent.name) in model['beliefs']: belief = self.getFeature(modelKey(agent.name), model['beliefs']) if len(belief) > 1: errors.append('Agent {} has uncertain belief about itself under model {}'.format(agent.name, model['name'])) else: if self.getFeature(modelKey(agent.name), model['beliefs'], unique=True) != model['name']: errors.append('Agent {} has incorrect belief about itself under model {}'.format(agent.name, model['name'])) else: errors.append('Agent {} has no belief about itself under model {}'.format(agent.name, model['name'])) if errors: raise UserWarning('\n'.join(errors))
"""---------------------""" """Visualization methods""" """---------------------"""
[docs] def explain(self,outcomes,level=1,buf=None): """ Generate a more readable interpretation of outcomes generated by L{step} :param outcomes: the return value from L{step} :type outcomes: dict[] :param level: the level of explanation detail: 0. No explanation 1. Agent decisions 2. Agent value functions 3. Agent expectations 4. Effects of expected actions 5. World state (possibly subjective) at each step :type level: int :param buf: the string buffer to put the explanation into (default is standard out) """ for outcome in outcomes: if level > 0: print('%d%%' % (outcome['probability']*100.),file=buf) if 'actions' in outcome: self.explainAction(outcome,buf,level) for name,action in outcome['actions'].items(): if not name in outcome['decisions']: # No decision made if level > 1: print(buf,'\tforced',file=buf) elif level > 1: # Explain decision self.explainDecision(outcome['decisions'][name],buf,level)
[docs] def explainAction(self,state=None,agents=None,buf=None,level=1): """ :param agents: subset of agents whose actions will be extracted (default is all acting agents) """ if state is None: state = self.state joint = {} order = {name: state[turnKey(name)] for name in self.agents if turnKey(name) in state} assert max(map(len,order.values())) == 1,'Unable to extract actions from uncertain turn orders' last = max([dist.first() for dist in order.values()]) for name,dist in sorted([(name,dist) for name,dist in order.items() if agents is None or name in agents]): if dist.first() == last: key = actionKey(name) if key in state: joint[name] = self.getFeature(key,state) if level > 0: print(joint[name],file=buf) return joint
[docs] def explainDecision(self,decision,buf=None,level=2,prefix=''): """ Subroutine of L{explain} for explaining agent decisions """ if not 'V' in decision: # No value function return actions = decision['V'].keys() actions.sort(lambda x,y: cmp(str(x),str(y))) for alt in actions: V = decision['V'][alt] print('%s\tV(%s) = %6.3f' % (prefix,alt,V['__EV__']),file=buf) if level > 2: # Explain lookahead beliefs = filter(lambda k: not isinstance(k,str),V.keys()) for state in beliefs: nodes = V[state]['projection'][:] while len(nodes) > 0: node = nodes.pop(0) tab = '' t = V[state]['horizon']-node['horizon'] for index in range(t): tab = prefix+tab+'\t' if level > 4: print('%sState:' % (tab),file=buf) self.printVector(node['old'],buf,prefix=tab,first=False) print('%s%s (V_%s=%6.3f) [P=%d%%]' % (tab,ActionSet(node['actions']),V[state]['agent'],node['R'],node['probability']*100.),file=buf) for other in node['decisions'].keys(): self.explainDecision(node['decisions'][other],buf,level,prefix+'\t\t') if level > 3: print('%sEffect:' % (tab+prefix),file=buf) self.printDelta(node['old'],node['new'],buf,prefix=tab+prefix) for index in range(len(node['projection'])): nodes.insert(index,node['projection'][index])
[docs] def resymbolize(self, state=None): if state is None: state = self.state if isinstance(state, KeyedVector) or isinstance(state, dict): return state.__class__({key: self.float2value(key, value) for key, value in state.items() if key != CONSTANT}) elif isinstance(state, VectorDistribution): return state.__class__({self.resymbolize(vector): prob for vector, prob in state.items()}) elif isinstance(state, VectorDistributionSet): result = state.__class__() result.certain = self.resymbolize(state.certain) for substate,distribution in state.distributions.items(): result.distributions[substate] = self.resymbolize(distribution) for key in distribution.keys(): result.keyMap[key] = substate return result elif isinstance(state, KeyedTree): if state.isLeaf(): return self.resymbolize(state.getLeaf()) elif state.isProbabilistic(): raise NotImplementedError else: raise NotImplementedError else: raise NotImplementedError
[docs] def printBeliefs(self,name,state=None,buf=None,prefix='',beliefs=True): models = self.getModel(name,state) previous = set() for model in models.domain(): print('%s = %s (%d%%)' % (modelKey(name),model,models[model]*100)) self.agents[name].printModel(model,buf,prefix=prefix,previous=previous)
[docs] def printState(self,distribution=None,buf=None,prefix='',beliefs=True,first=True,models=None): """ Utility method for displaying a distribution over possible worlds :type distribution: L{VectorDistribution} :param buf: the string buffer to put the string representation in (default is standard output) :param prefix: a string prefix (e.g., tabs) to insert at the beginning of each line :type prefix: str :param beliefs: if C{True}, print out inaccurate beliefs, too :type beliefs: bool """ if distribution is None: distribution = self.state print(prefix+str(self.resymbolize(distribution)).replace('\n', '\n%s' % (prefix)), file=buf) if beliefs: if models is None: models = set() for name in self.agents: if modelKey(name) in distribution: dist = self.getFeature(modelKey(name),distribution) for model in sorted(dist.domain()): self.agents[name].printModel(model,buf,reward=True,previous=models)
[docs] def printVector(self,vector,buf=None,prefix='',first=True,beliefs=False,csv=False,models=None): """ Utility method for displaying a single possible world :type vector: L{psychsim.pwl.KeyedVector} :param buf: the string buffer to put the string representation in (default is standard output) :param prefix: a string prefix (e.g., tabs) to insert at the beginning of each line :type prefix: str :param first: if C{True}, then the first line is the continuation of an existing line (default is C{True}) :type first: bool :param csv: if C{True}, then print the vector as comma-separated values (default is C{False}) :type csv: bool :param beliefs: if C{True}, then print any agent beliefs that might deviate from this vector as well (default is C{False}) :type beliefs: bool """ if models is None: models = set() if csv: if prefix: elements = [prefix] else: elements = [] entities = sorted(self.agents.keys()) entities.insert(0,keys.WORLD) change = False # Sort relations relations = {} for link,table in self.relations.items(): for key in table.keys(): subj = table[key]['subject'] obj = table[key]['object'] try: relations[subj].append((link,obj,key)) except KeyError: relations[subj] = [(link,obj,key)] for entity in entities: try: table = {state2feature(k): k for k in vector.keys() \ if keys.isStateKey(k) and keys.state2agent(k) == entity \ and not keys.isFuture(k)} except KeyError: table = {} if entity is None: label = keys.WORLD else: if entity in vector: # Action performed in this vector table['__action__'] = entity label = entity newEntity = True # Print state features for this entity for feature,key in sorted(table.items()): if key in vector: if isFuture(key): value = self.float2value(makePresent(key),vector[key]) else: value = self.float2value(key,vector[key]) if csv: elements.append(label) elements.append(feature) elements.append(value) else: future = makeFuture(key) if future in vector: fValue = self.float2value(key,vector[future]) if fValue != value: value = '%s->%s' % (value,fValue) else: value = '%s.' % (value) if newEntity: newEntity = False change = True else: label = '' if first: first = False start = '' else: start = prefix print('%s\t%-12s\t%-12s\t%s' % (start,label,feature+':', value),file=buf) # Print relationships if entity in relations: for link,obj,key in relations[entity]: if key in vector: if newEntity: print('\t%-12s' % (label),file=buf) newEntity = False print('\t\t%s\t%s:\t%s' % (link,obj,self.float2value(key,vector[key])),file=buf) # Print models (and beliefs associated with those models) if not entity is None: # Print model of this entity key = modelKey(entity) if key in vector: if csv: elements.append(label) elements.append(MODEL) elements.append(self.agents[entity].index2model(vector[key])) elif not beliefs: if first: print('\t%-12s:\t%s' % (label,self.agents[entity].index2model(vector[key])),file=buf) first = False else: print('%s\t%-12s\t%s' % (prefix,label,self.agents[entity].index2model(vector[key])),file=buf) change = True newEntity = False elif newEntity: if first: print('\t%-12s' % (label),file=buf) first = False else: print('%s\t%-12s' % (prefix,label),file=buf) self.agents[entity].printModel(index=vector[key],prefix=prefix,previous=models) change = True newEntity = False else: print('\t%12s' % (''),file=buf) self.agents[entity].printModel(index=vector[key],prefix=prefix,previous=models) newEntity = False # if not csv and not change: # print('%s\tUnchanged' % (prefix),file=buf) if csv: print(','.join(elements),file=buf)
[docs] def printDelta(self,old,new,buf=None,prefix=''): """ Prints a kind of diff patch for one state vector with respect to another :param old: the "original" state vector :type old: L{psychsim.pwl.KeyedVector} :param new: the state vector we want to see the diff of :type new: L{VectorDistribution} """ deltaDist = psychsim.pwl.VectorDistribution() for vector in new.domain(): delta = psychsim.pwl.KeyedVector() deltakeys = [] for key,entry in self.variables.items(): # Look for change in feature value deltakeys.append(key) for name in self.agents.keys(): # Look for change in mental model of this agent key = modelKey(name) if key in vector: deltakeys.append(key) if not key in old: old = psychsim.pwl.KeyedVector(vector) old[key] = self.agents[name].model2index(True) for key in deltakeys: try: diff = abs(vector[key]-old[key]) except KeyError: diff = 0. if diff > 1e-3: # Notable change delta[key] = vector[key] if TERMINATED in vector: if self.terminated(vector): delta[TERMINATED] = 1. else: delta[TERMINATED] = -1. try: deltaDist[delta] += new[vector] except KeyError: deltaDist[delta] = new[vector] self.printState(deltaDist,buf,prefix=prefix,beliefs=False)
"""---------------------""" """Serialization methods""" """---------------------"""
[docs] def save(self,filename): """ :returns: the filename used (possibly with a .psy extension added) :rtype: str """ if filename[-4:] != '.psy': filename = '%s.psy' % (filename) with bz2.BZ2File(filename,'w') as f: pickle.dump(self,f) return filename
[docs]def scaleValue(value,entry): """ :returns: a new float value that has been normalized according to the feature's domain """ if entry['domain'] is float or entry['domain'] is int: # Scale by range of possible values return float(value-entry['lo']) / float(entry['hi']-entry['lo']) elif entry['domain'] is list: # Scale by size of set of values return float(value)/float(len(entry['elements'])) else: return value
[docs]def loadWorld(filename): if filename[-4:] != '.psy': filename = '%s.psy' % (filename) f = bz2.BZ2File(filename,'rb') return pickle.load(f)