import collections.abc
from xml.dom.minidom import Document,Node
from psychsim.probability import Distribution
from . import keys
[docs]class KeyedVector(collections.abc.MutableMapping):
"""
Class for a compact, string-indexable vector
:cvar epsilon: the margin used for equality of vectors (as well as for testing hyperplanes in L{KeyedPlane})
:type epsilon: float
:ivar _string: the C{str} representation of this vector
:type _string: bool
"""
epsilon = 1e-8
def __init__(self,arg={}):
collections.abc.MutableMapping.__init__(self)
self._data = {}
self._string = None
if isinstance(arg,Node):
self.parse(arg)
elif arg:
self._data.update(arg)
[docs] def prune(self):
self._data = {key: value for key, value in self._data.items() if abs(value) >= self.epsilon}
self._string = None
return self
def __contains__(self, key):
return key in self._data
def __eq__(self, other):
delta = 0.
tested = {}
for key, value in self.items():
try:
delta += abs(value-other[key])
except KeyError:
delta += abs(value)
tested[key] = True
for key,value in other.items():
if key not in tested:
delta += abs(value)
return delta < self.epsilon
def __ne__(self,other):
return not self == other
def __add__(self,other):
result = KeyedVector(self)
for key,value in other.items():
result[key] = value + result.get(key,0.)
return result
def __neg__(self):
result = KeyedVector()
for key,value in self.items():
result[key] = -value
return result
def __sub__(self,other):
return self + (-other)
def __mul__(self,other):
if isinstance(other,KeyedVector):
# Dot product
total = 0
if len(self) > len(other):
for key,value in other.items():
if key in self:
total += value*self[key]
else:
for key,value in self.items():
if key in other:
total += value*other[key]
return total
elif isinstance(other,float):
# Scaling
result = KeyedVector({key: value*other for key,value in self.items()})
return result
else:
return other.__rmul__(self)
# raise NotImplementedError
def __rmul__(self,other):
if isinstance(other,float) or isinstance(other,int):
result = self.__class__({key: other*value for key,value in self.items()})
return result
else:
raise NotImplementedError
def __imul__(self,other):
"""
:type other: KeyedMatrix or KeyedTree
"""
try:
for row,vector in other.items():
self[row] = self*vector
except AttributeError:
matrix = other[self]
if isinstance(matrix,Distribution):
result = VectorDistribution()
for mat in matrix.domain():
vector = self.__class__(self)
vector *= mat
result.addProb(vector,matrix[mat])
return result
else:
for row,vector in matrix.items():
self[row] = self*vector
return self
[docs] def copy_value(self, old_key, new_key):
"""
Modifies the state so that the distribution over the new key's values is identical to that of the old key
"""
self[new_key] = self[old_key]
[docs] def rollback(self,future=None):
if future is None:
future = [key for key in self if keys.isFuture(key)]
for key in future:
self[keys.makePresent(key)] = self[key]
del self[key]
def __getitem__(self,key):
return self._data[key]
def __setitem__(self,key,value):
self._string = None
self._data[key] = value
def __delitem__(self,key):
self._string = None
del self._data[key]
def __iter__(self):
return self._data.__iter__()
def __len__(self):
return len(self._data)
[docs] def normalize(self):
"""
Multiplies all of the weights so that the smallest weight is 1 and the relative values are preserved
:return: the multiplier used
"""
alpha = 1/min([abs(v) for v in self._data.values()])
if abs(alpha-1) > self.epsilon:
self._string = None
for key in self._data.keys():
self._data[key] *= alpha
return alpha
[docs] def desymbolize(self,table,debug=False):
result = self.__class__()
for key,value in self.items():
if isinstance(value,float) or isinstance(value,int):
result[key] = value
else:
result[key] = table[value]
return result
[docs] def makeFuture(self,keyList=None):
"""
Transforms this vector to refer to only future versions of its columns
:param keyList: If present, only references to these keys are made future
"""
return self.changeTense(True,keyList)
[docs] def makePresent(self,keyList=None):
return self.changeTense(False,keyList)
[docs] def changeTense(self,future=True,keyList=None):
if keyList is None:
keyList = self.keys()
for key in keyList:
if key in self and not key == keys.CONSTANT:
if future:
assert not keys.isFuture(key)
value = self[key]
del self[key]
if future:
self[keys.makeFuture(key)] = value
else:
self[keys.makePresent(key)] = value
[docs] def filter(self,ignore):
"""
:return: a copy of me applying the given lambda expression to the keys (if a list is provided, then any keys in that list are dropped out)
:rtype: KeyedVector
"""
if isinstance(ignore,list):
test = lambda k: not k in ignore
else:
test = ignore
result = self.__class__()
for key in filter(test,self.keys()):
result[key] = self[key]
return result
[docs] def nearestNeighbor(self,vectors):
"""
:return: the vector in the given set that is closest to me
:rtype: KeyedVector
"""
bestVector = None
bestValue = None
for vector in vectors:
d = self.distance(vector)
if bestVector is None or d < bestValue:
bestValue = d
bestVector = vector
return bestVector
[docs] def distance(self,vector):
"""
:return: the distance between the given vector and myself
:rtype: float
"""
d = 0.
for key in self.keys():
d += pow(self[key]-vector[key],2)
return d
def __gt__(self,other):
return sorted(self.items()) > sorted(other.items())
def __str__(self):
if self._string is None:
mykeys = list(self.keys())
mykeys.sort()
self._string = '\n'.join(['%s: %s' % (k,self[k]) for k in mykeys])
return self._string
[docs] def sortedString(self):
maxLength = max([len(key) for key in self])+1
return '\n'.join(['{:{width}} {}'.format(key+':',value,width=maxLength) for key,value in sorted(self.items())])
[docs] def hyperString(self):
"""
:return: a string representation of this vector treating it as a weighted sum
"""
def term2str(coef,key):
if isinstance(coef,float):
if key == keys.CONSTANT:
return '%5.3f' % (coef)
else:
return '%5.3f*%s' % (coef,key)
elif key == keys.CONSTANT:
if isinstance(coef,int):
return '%d' % (coef)
else:
return '%s' % (coef)
elif coef == 1:
return '%s' % (key)
elif coef == -1:
return '-%s' % (key)
else:
return '%d*%s' % (coef,key)
rowStr = None
items = sorted(self.items())
if items[0][0] == keys.CONSTANT:
items.append(items[0])
del items[0]
for key,coef in items:
substr = term2str(coef,key)
if rowStr is None:
rowStr = substr
elif substr[0] == '-':
rowStr += substr
else:
rowStr += '+%s' % (substr)
return rowStr
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__,dict(self))
def __hash__(self):
return hash(tuple(self._data.items()))
# return hash(frozenset(self._data.items()))
[docs] def diff(self, other):
my_keys = set(self.keys())
yr_keys = set(other.keys())
return sorted((my_keys-yr_keys)|(yr_keys-my_keys)|{k for k in my_keys&yr_keys if self[k] != other[k]})
[docs]class VectorDistribution(Distribution):
"""
A class representing a L{Distribution} over L{KeyedVector} instances
"""
# def __init__(self,args=None):
# if args is None:
# args = {KeyedVector({keys.CONSTANT:1}):1}
# Distribution.__init__(self,args)
def __contains__(self, key):
return key in self.first()
[docs] def keys(self):
"""
:return: The keys of the vectors in the domain (assumed to be uniform),
NOT the keys of the domain itself
"""
if len(self) > 0:
return {k for k in self.first().keys() if k != keys.CONSTANT}
else:
return set()
[docs] def join(self, key, value):
"""
Modifies the distribution over vectors to have the given value for the given key
:param key: the key to the column to modify
:type key: str
:param value: either a single value to apply to all vectors, or else a L{Distribution} over possible values
"""
if isinstance(value, Distribution) and len(value) > 1:
# Uncertain values to be joined
new_items = []
for my_el, my_prob in self.items():
for yr_el, yr_prob in value.items():
new_row = my_el.__class__(my_el)
new_row[key] = yr_el
new_items.append((new_row, my_prob*yr_prob))
self._Distribution__items = new_items
else:
if isinstance(value, Distribution):
value = value.first()
for index, item in enumerate(self._Distribution__items):
item[0][key] = value
self._Distribution__items[index] = (item[0], item[1])
[docs] def merge(self, other, inPlace=False):
"""
Merge two distributions (the passed-in distribution takes precedence over this one in case of conflict)
:type other: VectorDistribution
:param inPlace: if C{True}, modify this distribution directly; otherwise, return a new distribution (default is C{False})
:type inPlace: bool
:return: the merged distribution
:rtype: VectorDistribution
"""
if inPlace:
result = self
items = list(self.items())
result.clear()
else:
result = self.__class__()
items = self.items()
for my_el, my_prob in items:
for yr_el, yr_prob in other.items():
new_el = my_el.__class__(my_el)
new_el.update(yr_el)
result.add_prob(new_el, my_prob*yr_prob)
return result
[docs] def marginal(self, key):
result = Distribution([(row[key], prob) for row, prob in self.items()])
result.remove_duplicates()
return result
[docs] def select(self,maximize=False,incremental=False):
"""
:param incremental: if C{True}, then select each key value in series (rather than picking out a joint vector all at once, default is C{False})
"""
if incremental:
# Sample each key and keep track how likely each individual choice was
sample = KeyedVector()
keys = self.domain()[0].keys()
index = 0
while len(self) > 1:
key = keys[index]
dist = self.marginal(key)
if len(dist) > 1:
# Have to make a choice here
element,sample[key] = dist.sample(True)
# Figure out where the "spinner" ended up across entire pie chart
for other in dist.domain():
if other == element:
break
else:
sample[key] += dist[other]
for vector in self.domain():
if vector[key] != element:
del self[vector]
self.normalize()
index += 1
return sample
else:
return super().select(maximize)
[docs] def delete_column(self, key):
"""
Removes the specified column from all vectors in this distribution
:type key: str
"""
for index, item in enumerate(self._Distribution__items):
del item[0][key]
self.remove_duplicates()
[docs] def deleteKey(self, key):
"""
Removes the specified column from all vectors in this distribution
:type key: str
"""
raise DeprecationWarning('Use delete_column instead')
[docs] def hasColumn(self,key):
"""
:return: C{True} iff the given key appears in all of the vectors of this distribution
:rtype: bool
"""
raise DeprecationWarning('This needs to be re-implemented. Notify the authorities."')
def __rmul__(self, other):
if isinstance(other, KeyedVector):
result = self.__class__([(other*vector, prob) for vector, prob in self.items()])
result.remove_duplicates()
return result
else:
raise NotImplementedError
def __imul__(self, other):
for vector, prob in self.items():
vector *= other
self.remove_duplicates()
return self
[docs] def prune_probability(self, probThreshold, true=None):
if true is None:
return super().prune_probability(probThreshold)
else:
raise NotImplementedError('Unable to preserve true state')
# for vec in self.domain():
# if self[vec] < probThreshold:
# if true:
# for key in true:
# if vec[key] != true[key]:
# break
# else:
# # Has the true state, so don't delete
# continue
def __deepcopy__(self, memo):
return self.__class__([(vector.__class__(vector), prob) for vector, prob in self.items()])
[docs] def copy_value(self, old_key, new_key):
"""
Modifies the state so that the distribution over the new key's values is identical to that of the old key
"""
for index, item in enumerate(self._Distribution__items):
item[0][new_key] = item[0][old_key]
self.remove_duplicates()
[docs] def rollback(self, future=None):
for index, item in enumerate(self._Distribution__items):
item[0].rollback(future)
self.remove_duplicates()
return self
[docs] def replace(self, substitution, key=None):
"""
Replaces column values, either across all columns, or only for the specified column
"""
for vector in self.domain():
if key is None:
key_list = vector.keys()
elif not isinstance(key, list):
key_list = [key]
else:
key_list = key
for key in key_list:
if vector[key] in substitution:
prob = self[vector]
del self[vector]
vector[key] = substitution[vector[key]]
self[vector] = prob
[docs] def domain(self, key=None):
if isinstance(key, str):
return {v[key] for v in self.domain()}
elif key is None:
return super().domain()
else:
raise NotImplementedError('Domain available for only a single variable')
[docs] def group_by_certainty(self, suppress_certain=False):
vectors = list(self.domain())
certain = {key: vectors[0][key] for key in self.keys()}
for vector in vectors[1:]:
for key, value in list(certain.items()):
if vector[key] != value:
del certain[key]
certain_str = str(self.__class__({vectors[0].__class__(certain): 1}))
uncertain_str = str(self.__class__({vectors[0].__class__({key: vector[key] for key in self.keys()-certain.keys()}): self[vector]
for vector in vectors}))
return uncertain_str if suppress_certain else '{}\n{}'.format(certain_str, uncertain_str)
[docs] def element_to_str(self, element):
return element.sortedString()