Source code for neet.boolean.logicnetwork

"""
.. currentmodule:: neet.boolean.logicnetwork

.. testsetup:: logicnetwork

from neet.boolean.logicnetwork import LogicNetwork

Logic-based Networks
====================
"""
import re
from neet.python import long
from neet.statespace import StateSpace
from neet.exceptions import FormatError
import networkx as nx

[docs]class LogicNetwork(object):
"""
The LogicNetwork class represents boolean networks whose update rules
follow logic relations among nodes. Each node state is expressed as 0
or 1.
"""

[docs]    def __init__(self, table, names=None, reduced=False):
"""
Construct a network from a logic truth table.

A truth table stores a list of tuples, one for each node in order. A
tuple of the form (A, {C1, C2, ...}) at index i provides the
activation conditions for the node of index i. A is a tuple marking
the indices of the nodes which influence the state of node i via
logic relations. {C1, C2, ...} is a set, each element of which is the
collection of binary states of these influencing nodes that would
activate node i, setting it to 1. Any other collection of states of
nodes in A are assumed to deactivate node i, setting it to 0.

C1, C2, etc. are sequences (tuple or str) of binary digits,
each being the binary state of corresponding node in A.

.. rubric:: Examples

.. doctest:: logicnetwork

>>> net = LogicNetwork([((0,), {'0'})])
>>> net.size
1
>>> net.table
[((0,), {'0'})]

.. doctest:: logicnetwork

>>> net = LogicNetwork([((1,), {'0', '1'}), ((0,), {'1'})])
>>> net.size
2
>>> net.table == [((1,), {'0', '1'}), ((0,), {'1'})]
True

.. doctest:: logicnetwork

>>> net = LogicNetwork([((1, 2), {'01', '10'}),
... ((0, 2), ((0, 1), '10', [1, 1])),
... ((0, 1), {'11'})], ['A', 'B', 'C'])
>>> net.size
3
>>> net.names
['A', 'B', 'C']
>>> net.table == [((1, 2), {'01', '10'}),
... ((0, 2), {'01', '11', '10'}), ((0, 1), {'11'})]
True

:param table: the logic table
:param names: names of nodes, default None
"""
if not isinstance(table, (list, tuple)):
raise TypeError("table must be a list or tuple")

self.__size = len(table)

if names:
if not isinstance(names, (list, tuple)):
raise TypeError("names must be a list or tuple")
elif len(names) != self.__size:
raise ValueError("number of names must match network size")
else:
self.names = list(names)

# Store positive truth table for human reader.
self.table = []
for row in table:
# Validate incoming indices.
if not (isinstance(row, (list, tuple)) and len(row) == 2):
raise ValueError("Invalid table format")
for idx in row[0]:
if idx >= self.__size:
raise IndexError("mask index out of range")
# Validate truth table of the sub net.
if not isinstance(row[1], (list, tuple, set)):
raise ValueError("Invalid table format")
conditions = set()
for condition in row[1]:
self.table.append((row[0], conditions))

if reduced:
self.reduce_table()

self._state_space = StateSpace(self.__size, base=2)

# Encode truth table for faster computation.
self._encode_table()

@property
def size(self):
"""
The number of nodes in the network.

.. doctest:: logicnetwork

>>> net = LogicNetwork([((1, 2), {'01', '10'}),
... ((0, 2), {'01', '10', '11'}), ((0, 1), {'11'})])
>>> net.size
3

:type: int
"""
return self.__size

def _encode_table(self):
self._encoded_table = []
for indices, conditions in self.table:
for idx in indices:
mask_code += 2 ** long(idx)  # Low order, low index.
# Encode each condition of truth table.
encoded_sub_table = set()
for condition in conditions:
encoded_condition = long(0)
for idx, state in zip(indices, condition):
encoded_condition += 2 ** long(idx) if int(state) else 0

def is_dependent(self, target, source):
"""
Return True if state of target is influenced by the state of
source.

.. doctest:: logicnetwork

>>> net = LogicNetwork([((1, 2), {'01', '10'}),
... ((0, 2), {'01', '10', '11'}), ((0, 1), {'11'})])
>>> net.is_dependent(0, 0)
False
>>> net.is_dependent(0, 2)
True

:param target: index of the target node
:param source: index of the source node
:returns: whether the target node is dependent on the source
"""
sub_table = self.table[target]
if source not in sub_table[0]:  # No explicit dependency.
return False

# Determine implicit dependency.
i = sub_table[0].index(source)
counter = {}
for state in sub_table[1]:
# State excluding source.
state_sans_source = state[:i] + state[i + 1:]
if long(state[i]) == 1:
counter[state_sans_source] = counter.get(
state_sans_source, 0) + 1
else:
counter[state_sans_source] = counter.get(
state_sans_source, 0) - 1

if any(counter.values()):  # States uneven.
return True
return False

[docs]    def reduce_table(self):
"""
Reduce truth table by removing input nodes which have no logic
influence from the truth table of each node.

.. note::
This function introduces the identity function for all nodes which
have no inputs. This ensure that every node has a well-defined
logical function. The example below demonstrates this with node
1.

.. doctest:: logicnetwork

>>> net = LogicNetwork([((0,1), {'00', '10'}), ((0,), {'0', '1'})])
>>> net.table == [((0,1), {'00', '10'}), ((0,), {'0', '1'})]
True
>>> net.reduce_table()
>>> net.table == [((1,), {'0'}), ((1,), {'1'})]
True
"""
reduced_table = []
for node, (sources, conditions) in enumerate(self.table):
reduced_sources = []
reduced_indices = []
for idx, source in enumerate(sources):
if self.is_dependent(node, source):
reduced_sources.append(source)
reduced_indices.append(idx)

if reduced_sources:  # Node state is influenced by other nodes.
reduced_conditions = set()
for condition in conditions:
reduced_condition = ''.join([str(condition[idx])
for idx in reduced_indices])
else:
# Node state is not influenced by other nodes including itself.
reduced_sources = (node, )
if not conditions:
# If original conditions is empty, node is never activated.
reduced_conditions = set()
elif node in sources:
# Node is always activated no matter its previous state.
reduced_conditions = {'0', '1'}
else:
# Node state is not changed.
reduced_conditions = {'1'}

reduced_table.append((tuple(reduced_sources), reduced_conditions))

self.table = reduced_table

self._encode_table()

[docs]    def state_space(self):
"""
Return a :class:neet.statespace.StateSpace object for the network.

.. doctest:: logicnetwork

>>> net = LogicNetwork([((1, 2), {'01', '10'}),
... ((0, 2), {'01', '10', '11'}), ((0, 1), {'11'})])
>>> net.state_space()
<neet.statespace.StateSpace object at 0x...>
>>> space = net.state_space()
>>> list(space)
[[0, 0, 0], [1, 0, 0], [0, 1, 0], [1, 1, 0], [0, 0, 1], [1, 0, 1], [0, 1, 1], [1, 1, 1]]

:returns: the network's :class:neet.statespace.StateSpace
"""
return self._state_space

def _unsafe_update(self, net_state, index=None, pin=None, values=None):
"""
Unsafely update node states according to the truth table.

If index is provided, only update the node at index. If
index is not provided, update all nodes. The input net_state is
not modified.

.. rubric:: Examples

.. doctest:: logicnetwork

>>> net = LogicNetwork([((0,), {'0'})])
>>> net._unsafe_update([0], 0)
[1]
>>> net._unsafe_update([1])
[0]

.. doctest:: logicnetwork

>>> net = LogicNetwork([((1,), {'0', '1'}), ((0,), {'1'})])
>>> net._unsafe_update([1, 0], 0))
[1, 0]
>>> net._unsafe_update([1, 0], 1))
[1, 1]
>>> net._unsafe_update([0, 0])
[1, 0]

.. doctest:: logicnetwork

>>> net = LogicNetwork([((1, 2), {'01', '10'}),
... ((0, 2), {(0, 1), '10', (1, 1)}),
... ((0, 1), {'11'})])
>>> net.size
3
>>> net._unsafe_update([0, 1, 0])
[1, 0, 0]
>>> net._unsafe_update([0, 0, 1])
[1, 1, 0]
>>> net._unsafe_update([0, 0, 1], 1)
[0, 1, 1]
>>> net._unsafe_update([0, 0, 1], pin=[1])
[1, 0, 0]
>>> net._unsafe_update([0, 0, 1], pin=[0, 1])
[0, 0, 0]
>>> net._unsafe_update([0, 0, 1], values={0: 0})
[0, 1, 0]
>>> net._unsafe_update([0, 0, 1], pin=[1], values={0: 0})
[0, 0, 0]

:param net_state: a sequence of binary node states
:type net_state: sequence
:param index: the index to update (or None)
:type index: int or None
:param pin: the indices to pin (or None)
:type pin: sequence
:param values: override values
:type values: dict
:returns: the updated states
"""
encoded_state = self.state_space()._unsafe_encode(net_state)

if index is None:
indices = range(self.__size)
else:
indices = [index]

if pin is None:
pin = []

for idx in indices:
if idx in pin:
continue
net_state[idx] = 1 if sub_net_state in condition else 0

if values:
for k, v in values.items():
net_state[k] = v

return net_state

[docs]    def update(self, net_state, index=None, pin=None, values=None):
"""
Update node states according to the truth table.

If index is provided, only update the node at index. If
index is not provided, update all ndoes. pin provides the
indices of which the nodes' states are forced to remain unchanged.

.. rubric:: Examples

.. doctest:: logicnetwork

>>> net = LogicNetwork([((0,), {'0'})])
>>> net.update([0], 0)
[1]
>>> net.update([1])
[0]
>>>

.. doctest:: logicnetwork

>>> net = LogicNetwork([((1,), {'0', '1'}), ((0,), {'1'})])
>>> net.update([1, 0], 0)
[1, 0]
>>> net.update([1, 0], 1)
[1, 1]
>>> net.update([0, 0])
[1, 0]

.. doctest:: logicnetwork

>>> net = LogicNetwork([((1, 2), {'01', '10'}),
... ((0, 2), {(0, 1), '10', (1, 1)}),
... ((0, 1), {'11'})])
>>> net.size
3
>>> net.update([0, 0, 1], 1)
[0, 1, 1]
>>> net.update([0, 1, 0])
[1, 0, 0]
>>> net.update([0, 0, 1])
[1, 1, 0]
>>> net.update([0, 0, 1], pin=[1])
[1, 0, 0]
>>> net.update([0, 0, 1], pin=[0, 1])
[0, 0, 0]
>>> net.update([0, 0, 1], values={0: 0})
[0, 1, 0]
>>> net.update([0, 0, 1], pin=[1], values={0: 0})
[0, 0, 0]

:param net_state: a sequence of binary node states
:type net_state: sequence
:param index: the index to update (or None)
:type index: int or None
:param pin: the indices to pin (or None)
:type pin: sequence
:param values: override values
:type values: dict
:returns: the updated states
"""
if net_state not in self.state_space():
raise ValueError(
"the provided state is not in the network's state space")

if values and any([v not in (0, 1) for v in values.values()]):
raise ValueError("invalid state in values argument")

if pin and values and any([k in pin for k in values]):
raise ValueError("cannot set a value for a pinned state")

return self._unsafe_update(net_state, index, pin, values)

[docs]    @classmethod
"""
Read a network from a truth table file.

A logic table file starts with a table title which contains names of
all nodes. It is a line marked by ## at the begining with node
names seperated by commas or spaces. This line is required. For
artificial network without node names, arbitrary names must be put in
place, e.g.:

## A B C D

Following are the sub-tables of logic conditions for every node. Each
sub-table nominates a node and its logically connected nodes in par-
enthesis as a comment line:

# A (B C)

The rest of the sub-table are states of those nodes in parenthesis
(B, C) that would activate the state of A. States that would
deactivate A should not be included in the sub-table.

A complete logic table with 3 nodes A, B, C would look like this:

::

## A B C
# A (B C)
1 0
1 1
# B (A)
1
# C (B C A)
1 0 1
0 1 0
0 1 1

Custom comments can be added above or below the table title (as long as
they are preceeded with more or less than two # (eg # or
### but not ##)).

.. rubric:: Examples:

.. doctest:: logicnetwork

>>> myeloid_path = '../neet/boolean/data/myeloid-truth_table.txt'
>>> net.size
11
>>> net.names
['GATA-2', 'GATA-1', 'FOG-1', 'EKLF', 'Fli-1', 'SCL', 'C/EBPa', 'PU.1', 'cJun', 'EgrNab', 'Gfi-1']

:param table_path: a path to a table table file
:type table_path: str
:returns: a :class:LogicNetwork
"""
names_format = re.compile(r'^\s*##[^#]+$') node_title_format = re.compile( r'^\s*#\s*(\S+)\s*$$(\s*(\S+\s*)+)$$\s*$')

with open(table_path, 'r') as f:
# Search for node names.
i = 0
names = []
while not names:
try:
if names_format.match(lines[i]):
names = re.split(r'\s*,\s*|\s+', lines[i].strip())[1:]
i += 1
except IndexError:

table = [()] * len(names)
# Create condition tables for each node.
for line in lines[i:]:
node_title = node_title_format.match(line)
if node_title:
node_name = node_title.group(1)
if node_name not in names:
raise FormatError(
"'{}' not in node names".format(node_name))
node_index = names.index(node_name)
sub_net_nodes = re.split(
r'\s*,\s*|\s+', node_title.group(2).strip())

in_nodes = tuple(map(names.index, sub_net_nodes))
table[node_index] = (in_nodes, set())
elif re.match(r'^\s*#.*\$', line):
# Skip a comment.
continue
else:
# Read activation conditions for node.
try:
if line.strip():
condition = re.split(r'\s*,\s*|\s+', line.strip())
else:
# Skip an empty line.
continue

if len(condition) != len(table[node_index][0]):
raise FormatError(
"number of states and nodes must match")
for state in condition:
if state not in ('0', '1'):
raise FormatError("node state must be binary")

except NameError:  # node_index not defined
raise FormatError(
"node must be specified before logic conditions")

# If no truth table is provided for a node, that node is considered
# an "external" node, i.e, its state stays on or off by itself.
for i, sub_table in enumerate(table):
if not sub_table:  # Empty truth table.
table[i] = ((i,), {'1'})

return cls(table, names, reduced)

[docs]    @classmethod
"""
Read a network from a file of logic equations.

A logic equations has the form of A = B AND ( C OR D ), each term
being separated from parantheses and logic operators with at least a
space. The optional external_nodes_path takes a file that contains
nodes in a column whose states do not depend on any nodes. These are
considered "external" nodes. Equivalently, such a node would have a
logic equation A = A, for its state stays on or off unless being
set externally, but now the node had to be excluded from
external_nodes_path to avoid duplication and confusion.

.. rubric:: Examples

.. doctest:: logicnetwork

>>> myeloid_path = '../neet/boolean/data/myeloid-logic_expressions.txt'
>>> net.size
11
>>> net.names
['GATA-2', 'GATA-1', 'FOG-1', 'EKLF', 'Fli-1', 'SCL', 'C/EBPa', 'PU.1', 'cJun', 'EgrNab', 'Gfi-1']

:param logic_path: path to a file of logial expressions
:type logic_path: str
:param external_nodes_path: a path to a file of external nodes
:type external_nodes_path: str
:returns: a :class:LogicNetwork
"""
names = []
expressions = []
with open(logic_path) as eq_file:
for eq in eq_file:
name, expr = eq.split('=')
names.append(name.strip())
expressions.append(expr.strip())

if external_nodes_path:
with open(external_nodes_path) as extra_file:
extras = [name.strip() for name in extra_file]
names += extras

ops = {'AND', 'OR', 'NOT'}

table = []
for expr in expressions:
sub_nodes = []
conditions = set()

expr_split = expr.split()
for i, item in enumerate(expr_split):
if item not in ops and item not in '()':
if item not in names:
raise ValueError("unknown component '{}'".format(item))
if item not in sub_nodes:
expr_split[i] = '{' + str(len(sub_nodes)) + '}'
sub_nodes.append(item)
else:
expr_split[i] = '{' + str(sub_nodes.index(item)) + '}'
else:
expr_split[i] = item.lower()
logic_expr = ' '.join(expr_split)

indices = tuple([names.index(node) for node in sub_nodes])

for dec_state in range(2**len(sub_nodes)):
bin_state = '{0:0{1}b}'.format(dec_state, len(sub_nodes))
if eval(logic_expr.format(*bin_state)):

table.append((indices, conditions))

# Add empty logic tables for external components.
if external_nodes_path:
for i in range(len(extras)):
table.append((((len(names) - len(extras) + i),), set('1')))

return cls(table, names, reduced)

[docs]    def neighbors_in(self, index):
"""
Return the set of all neighbor nodes, where edge(neighbor_node-->index)
exists.

.. rubric:: Examples

.. doctest:: logicnetwork

>>> net = LogicNetwork([((1, 2), {'11', '10'}),
... ((0,), {'1'}),
... ((0, 1, 2), {'010', '011', '101'}),
... ((3,), {'1'})])
>>> [net.neighbors_in(node) for node in range(net.size)]
[{1, 2}, {0}, {0, 1, 2}, {3}]

:param index: node index
:returns: the set of all node indices which point toward the index node
"""
return set(self.table[index][0])

[docs]    def neighbors_out(self, index):
"""
Return the set of all neighbor nodes, where edge(index-->neighbor_node)
exists.

.. rubric:: Examples

.. doctest:: logicnetwork

>>> net = LogicNetwork([((1, 2), {'11', '10'}),
... ((0,), {'1'}),
... ((0, 1, 2), {'010', '011', '101'}),
... ((3,), {'1'})])
>>> [net.neighbors_out(node) for node in range(net.size)]
[{1, 2}, {0, 2}, {0, 2}, {3}]

:param index: node index
:returns: the set of all node indices which the index node points to
"""
outgoing_neighbors = set()
for i, incoming_neighbors in enumerate([row[0] for row in self.table]):
if index in incoming_neighbors:

return outgoing_neighbors

[docs]    def neighbors(self, index):
"""
Return a set of neighbors for a specified node, or a list of sets of
neighbors for all nodes in the network.

.. rubric:: Examples

.. doctest:: logicnetwork

>>> net = LogicNetwork([((1, 2), {'11', '10'}),
... ((0,), {'1'}),
... ((0, 1, 2), {'010', '011', '101'}),
... ((3,), {'1'})])
>>> [net.neighbors(node) for node in range(net.size)]
[{1, 2}, {0, 2}, {0, 1, 2}, {3}]

:param index: node index
:returns: a set of neighbors of a node
"""
return self.neighbors_in(index) | self.neighbors_out(index)

[docs]    def to_networkx_graph(self, labels='indices'):
"""
Return networkx graph given neet network.

:param labels: how node is labeled and thus identified in networkx
graph ('names' or 'indices')
:returns: a networkx.DiGraph
"""
if labels == 'names':
if hasattr(self, 'names') and (self.names is not None):
labels = self.names
else:
raise ValueError("network nodes do not have names")

elif labels == 'indices':
labels = range(self.__size)

else:
raise ValueError("labels must be 'names' or 'indices'")

edges = []
for i, label in enumerate(labels):
for j in self.neighbors_out(i):
edges.append((labels[i], labels[j]))

[docs]    def draw(self, labels='indices', filename=None):
"""
Output a file with a simple network drawing.

Requires networkx and pygraphviz.

Supported image formats are determined by graphviz. In particular,
pdf support requires 'cairo' and 'pango' to be installed prior
to graphviz installation.

:param labels: how node is labeled and thus identified in networkx
graph ('names' or 'indices'), only used if
network is a :class:LogicNetwork or
:class:neet.boolean.WTNetwork
:param filename: filename to write drawing to. Temporary filename will
be used if no filename provided.
:returns: a pygraphviz network drawing
"""
nx.nx_agraph.view_pygraphviz(self.to_networkx_graph(
labels=labels), prog='circo', path=filename)