"""
.. currentmodule:: neet.boolean.logicnetwork
.. testsetup:: logicnetwork
from neet.boolean.logicnetwork import LogicNetwork
Logic-based Networks
====================
"""
import re
from neet.python import long
from neet.statespace import StateSpace
from neet.exceptions import FormatError
import networkx as nx
[docs]class LogicNetwork(object):
"""
The LogicNetwork class represents boolean networks whose update rules
follow logic relations among nodes. Each node state is expressed as ``0``
or ``1``.
"""
[docs] def __init__(self, table, names=None, reduced=False):
"""
Construct a network from a logic truth table.
A truth table stores a list of tuples, one for each node in order. A
tuple of the form `(A, {C1, C2, ...})` at index `i` provides the
activation conditions for the node of index `i`. `A` is a tuple marking
the indices of the nodes which influence the state of node `i` via
logic relations. `{C1, C2, ...}` is a set, each element of which is the
collection of binary states of these influencing nodes that would
activate node `i`, setting it to `1`. Any other collection of states of
nodes in `A` are assumed to deactivate node `i`, setting it to `0`.
`C1`, `C2`, etc. are sequences (`tuple` or `str`) of binary digits,
each being the binary state of corresponding node in `A`.
.. rubric:: Examples
.. doctest:: logicnetwork
>>> net = LogicNetwork([((0,), {'0'})])
>>> net.size
1
>>> net.table
[((0,), {'0'})]
.. doctest:: logicnetwork
>>> net = LogicNetwork([((1,), {'0', '1'}), ((0,), {'1'})])
>>> net.size
2
>>> net.table == [((1,), {'0', '1'}), ((0,), {'1'})]
True
.. doctest:: logicnetwork
>>> net = LogicNetwork([((1, 2), {'01', '10'}),
... ((0, 2), ((0, 1), '10', [1, 1])),
... ((0, 1), {'11'})], ['A', 'B', 'C'])
>>> net.size
3
>>> net.names
['A', 'B', 'C']
>>> net.table == [((1, 2), {'01', '10'}),
... ((0, 2), {'01', '11', '10'}), ((0, 1), {'11'})]
True
:param table: the logic table
:param names: names of nodes, default None
"""
if not isinstance(table, (list, tuple)):
raise TypeError("table must be a list or tuple")
self.__size = len(table)
if names:
if not isinstance(names, (list, tuple)):
raise TypeError("names must be a list or tuple")
elif len(names) != self.__size:
raise ValueError("number of names must match network size")
else:
self.names = list(names)
# Store positive truth table for human reader.
self.table = []
for row in table:
# Validate incoming indices.
if not (isinstance(row, (list, tuple)) and len(row) == 2):
raise ValueError("Invalid table format")
for idx in row[0]:
if idx >= self.__size:
raise IndexError("mask index out of range")
# Validate truth table of the sub net.
if not isinstance(row[1], (list, tuple, set)):
raise ValueError("Invalid table format")
conditions = set()
for condition in row[1]:
conditions.add(''.join([str(long(s)) for s in condition]))
self.table.append((row[0], conditions))
if reduced:
self.reduce_table()
self._state_space = StateSpace(self.__size, base=2)
# Encode truth table for faster computation.
self._encode_table()
self.metadata = {}
@property
def size(self):
"""
The number of nodes in the network.
.. doctest:: logicnetwork
>>> net = LogicNetwork([((1, 2), {'01', '10'}),
... ((0, 2), {'01', '10', '11'}), ((0, 1), {'11'})])
>>> net.size
3
:type: int
"""
return self.__size
def _encode_table(self):
self._encoded_table = []
for indices, conditions in self.table:
# Encode the mask.
mask_code = long(0)
for idx in indices:
mask_code += 2 ** long(idx) # Low order, low index.
# Encode each condition of truth table.
encoded_sub_table = set()
for condition in conditions:
encoded_condition = long(0)
for idx, state in zip(indices, condition):
encoded_condition += 2 ** long(idx) if int(state) else 0
encoded_sub_table.add(encoded_condition)
self._encoded_table.append((mask_code, encoded_sub_table))
def is_dependent(self, target, source):
"""
Return True if state of ``target`` is influenced by the state of
``source``.
.. doctest:: logicnetwork
>>> net = LogicNetwork([((1, 2), {'01', '10'}),
... ((0, 2), {'01', '10', '11'}), ((0, 1), {'11'})])
>>> net.is_dependent(0, 0)
False
>>> net.is_dependent(0, 2)
True
:param target: index of the target node
:param source: index of the source node
:returns: whether the target node is dependent on the source
"""
sub_table = self.table[target]
if source not in sub_table[0]: # No explicit dependency.
return False
# Determine implicit dependency.
i = sub_table[0].index(source)
counter = {}
for state in sub_table[1]:
# State excluding source.
state_sans_source = state[:i] + state[i + 1:]
if long(state[i]) == 1:
counter[state_sans_source] = counter.get(
state_sans_source, 0) + 1
else:
counter[state_sans_source] = counter.get(
state_sans_source, 0) - 1
if any(counter.values()): # States uneven.
return True
return False
[docs] def reduce_table(self):
"""
Reduce truth table by removing input nodes which have no logic
influence from the truth table of each node.
.. note::
This function introduces the identity function for all nodes which
have no inputs. This ensure that every node has a well-defined
logical function. The example below demonstrates this with node
``1``.
.. doctest:: logicnetwork
>>> net = LogicNetwork([((0,1), {'00', '10'}), ((0,), {'0', '1'})])
>>> net.table == [((0,1), {'00', '10'}), ((0,), {'0', '1'})]
True
>>> net.reduce_table()
>>> net.table == [((1,), {'0'}), ((1,), {'1'})]
True
"""
reduced_table = []
for node, (sources, conditions) in enumerate(self.table):
reduced_sources = []
reduced_indices = []
for idx, source in enumerate(sources):
if self.is_dependent(node, source):
reduced_sources.append(source)
reduced_indices.append(idx)
if reduced_sources: # Node state is influenced by other nodes.
reduced_conditions = set()
for condition in conditions:
reduced_condition = ''.join([str(condition[idx])
for idx in reduced_indices])
reduced_conditions.add(reduced_condition)
else:
# Node state is not influenced by other nodes including itself.
reduced_sources = (node, )
if not conditions:
# If original conditions is empty, node is never activated.
reduced_conditions = set()
elif node in sources:
# Node is always activated no matter its previous state.
reduced_conditions = {'0', '1'}
else:
# Node state is not changed.
reduced_conditions = {'1'}
reduced_table.append((tuple(reduced_sources), reduced_conditions))
self.table = reduced_table
self._encode_table()
[docs] def state_space(self):
"""
Return a :class:`neet.statespace.StateSpace` object for the network.
.. doctest:: logicnetwork
>>> net = LogicNetwork([((1, 2), {'01', '10'}),
... ((0, 2), {'01', '10', '11'}), ((0, 1), {'11'})])
>>> net.state_space()
<neet.statespace.StateSpace object at 0x...>
>>> space = net.state_space()
>>> list(space)
[[0, 0, 0], [1, 0, 0], [0, 1, 0], [1, 1, 0], [0, 0, 1], [1, 0, 1], [0, 1, 1], [1, 1, 1]]
:returns: the network's :class:`neet.statespace.StateSpace`
"""
return self._state_space
def _unsafe_update(self, net_state, index=None, pin=None, values=None):
"""
Unsafely update node states according to the truth table.
If ``index`` is provided, only update the node at ``index``. If
``index`` is not provided, update all nodes. The input ``net_state`` is
not modified.
.. rubric:: Examples
.. doctest:: logicnetwork
>>> net = LogicNetwork([((0,), {'0'})])
>>> net._unsafe_update([0], 0)
[1]
>>> net._unsafe_update([1])
[0]
.. doctest:: logicnetwork
>>> net = LogicNetwork([((1,), {'0', '1'}), ((0,), {'1'})])
>>> net._unsafe_update([1, 0], 0))
[1, 0]
>>> net._unsafe_update([1, 0], 1))
[1, 1]
>>> net._unsafe_update([0, 0])
[1, 0]
.. doctest:: logicnetwork
>>> net = LogicNetwork([((1, 2), {'01', '10'}),
... ((0, 2), {(0, 1), '10', (1, 1)}),
... ((0, 1), {'11'})])
>>> net.size
3
>>> net._unsafe_update([0, 1, 0])
[1, 0, 0]
>>> net._unsafe_update([0, 0, 1])
[1, 1, 0]
>>> net._unsafe_update([0, 0, 1], 1)
[0, 1, 1]
>>> net._unsafe_update([0, 0, 1], pin=[1])
[1, 0, 0]
>>> net._unsafe_update([0, 0, 1], pin=[0, 1])
[0, 0, 0]
>>> net._unsafe_update([0, 0, 1], values={0: 0})
[0, 1, 0]
>>> net._unsafe_update([0, 0, 1], pin=[1], values={0: 0})
[0, 0, 0]
:param net_state: a sequence of binary node states
:type net_state: sequence
:param index: the index to update (or None)
:type index: int or None
:param pin: the indices to pin (or None)
:type pin: sequence
:param values: override values
:type values: dict
:returns: the updated states
"""
encoded_state = self.state_space()._unsafe_encode(net_state)
if index is None:
indices = range(self.__size)
else:
indices = [index]
if pin is None:
pin = []
for idx in indices:
if idx in pin:
continue
mask, condition = self._encoded_table[idx]
sub_net_state = mask & encoded_state
net_state[idx] = 1 if sub_net_state in condition else 0
if values:
for k, v in values.items():
net_state[k] = v
return net_state
[docs] def update(self, net_state, index=None, pin=None, values=None):
"""
Update node states according to the truth table.
If ``index`` is provided, only update the node at ``index``. If
``index`` is not provided, update all ndoes. ``pin`` provides the
indices of which the nodes' states are forced to remain unchanged.
.. rubric:: Examples
.. doctest:: logicnetwork
>>> net = LogicNetwork([((0,), {'0'})])
>>> net.update([0], 0)
[1]
>>> net.update([1])
[0]
>>>
.. doctest:: logicnetwork
>>> net = LogicNetwork([((1,), {'0', '1'}), ((0,), {'1'})])
>>> net.update([1, 0], 0)
[1, 0]
>>> net.update([1, 0], 1)
[1, 1]
>>> net.update([0, 0])
[1, 0]
.. doctest:: logicnetwork
>>> net = LogicNetwork([((1, 2), {'01', '10'}),
... ((0, 2), {(0, 1), '10', (1, 1)}),
... ((0, 1), {'11'})])
>>> net.size
3
>>> net.update([0, 0, 1], 1)
[0, 1, 1]
>>> net.update([0, 1, 0])
[1, 0, 0]
>>> net.update([0, 0, 1])
[1, 1, 0]
>>> net.update([0, 0, 1], pin=[1])
[1, 0, 0]
>>> net.update([0, 0, 1], pin=[0, 1])
[0, 0, 0]
>>> net.update([0, 0, 1], values={0: 0})
[0, 1, 0]
>>> net.update([0, 0, 1], pin=[1], values={0: 0})
[0, 0, 0]
:param net_state: a sequence of binary node states
:type net_state: sequence
:param index: the index to update (or None)
:type index: int or None
:param pin: the indices to pin (or None)
:type pin: sequence
:param values: override values
:type values: dict
:returns: the updated states
"""
if net_state not in self.state_space():
raise ValueError(
"the provided state is not in the network's state space")
if values and any([v not in (0, 1) for v in values.values()]):
raise ValueError("invalid state in values argument")
if pin and values and any([k in pin for k in values]):
raise ValueError("cannot set a value for a pinned state")
return self._unsafe_update(net_state, index, pin, values)
[docs] @classmethod
def read_table(cls, table_path, reduced=False):
"""
Read a network from a truth table file.
A logic table file starts with a table title which contains names of
all nodes. It is a line marked by ``##`` at the begining with node
names seperated by commas or spaces. This line is required. For
artificial network without node names, arbitrary names must be put in
place, e.g.:
``## A B C D``
Following are the sub-tables of logic conditions for every node. Each
sub-table nominates a node and its logically connected nodes in par-
enthesis as a comment line:
``# A (B C)``
The rest of the sub-table are states of those nodes in parenthesis
``(B, C)`` that would activate the state of A. States that would
deactivate ``A`` should not be included in the sub-table.
A complete logic table with 3 nodes A, B, C would look like this:
::
## A B C
# A (B C)
1 0
1 1
# B (A)
1
# C (B C A)
1 0 1
0 1 0
0 1 1
Custom comments can be added above or below the table title (as long as
they are preceeded with more or less than two ``#`` (eg ``#`` or
``###`` but not ``##``)).
.. rubric:: Examples:
.. doctest:: logicnetwork
>>> myeloid_path = '../neet/boolean/data/myeloid-truth_table.txt'
>>> net = LogicNetwork.read_table(myeloid_path)
>>> net.size
11
>>> net.names
['GATA-2', 'GATA-1', 'FOG-1', 'EKLF', 'Fli-1', 'SCL', 'C/EBPa', 'PU.1', 'cJun', 'EgrNab', 'Gfi-1']
:param table_path: a path to a table table file
:type table_path: str
:returns: a :class:`LogicNetwork`
"""
names_format = re.compile(r'^\s*##[^#]+$')
node_title_format = re.compile(
r'^\s*#\s*(\S+)\s*\((\s*(\S+\s*)+)\)\s*$')
with open(table_path, 'r') as f:
lines = f.read().splitlines()
# Search for node names.
i = 0
names = []
while not names:
try:
if names_format.match(lines[i]):
names = re.split(r'\s*,\s*|\s+', lines[i].strip())[1:]
i += 1
except IndexError:
raise FormatError("node names not found in file")
table = [()] * len(names)
# Create condition tables for each node.
for line in lines[i:]:
node_title = node_title_format.match(line)
if node_title:
node_name = node_title.group(1)
# Read specifications for node.
if node_name not in names:
raise FormatError(
"'{}' not in node names".format(node_name))
node_index = names.index(node_name)
sub_net_nodes = re.split(
r'\s*,\s*|\s+', node_title.group(2).strip())
in_nodes = tuple(map(names.index, sub_net_nodes))
table[node_index] = (in_nodes, set())
elif re.match(r'^\s*#.*$', line):
# Skip a comment.
continue
else:
# Read activation conditions for node.
try:
if line.strip():
condition = re.split(r'\s*,\s*|\s+', line.strip())
else:
# Skip an empty line.
continue
if len(condition) != len(table[node_index][0]):
raise FormatError(
"number of states and nodes must match")
for state in condition:
if state not in ('0', '1'):
raise FormatError("node state must be binary")
table[node_index][1].add(''.join(condition))
except NameError: # node_index not defined
raise FormatError(
"node must be specified before logic conditions")
# If no truth table is provided for a node, that node is considered
# an "external" node, i.e, its state stays on or off by itself.
for i, sub_table in enumerate(table):
if not sub_table: # Empty truth table.
table[i] = ((i,), {'1'})
return cls(table, names, reduced)
[docs] @classmethod
def read_logic(cls, logic_path, external_nodes_path=None, reduced=False):
"""
Read a network from a file of logic equations.
A logic equations has the form of ``A = B AND ( C OR D )``, each term
being separated from parantheses and logic operators with at least a
space. The optional ``external_nodes_path`` takes a file that contains
nodes in a column whose states do not depend on any nodes. These are
considered "external" nodes. Equivalently, such a node would have a
logic equation ``A = A``, for its state stays on or off unless being
set externally, but now the node had to be excluded from
``external_nodes_path`` to avoid duplication and confusion.
.. rubric:: Examples
.. doctest:: logicnetwork
>>> myeloid_path = '../neet/boolean/data/myeloid-logic_expressions.txt'
>>> net = LogicNetwork.read_logic(myeloid_path)
>>> net.size
11
>>> net.names
['GATA-2', 'GATA-1', 'FOG-1', 'EKLF', 'Fli-1', 'SCL', 'C/EBPa', 'PU.1', 'cJun', 'EgrNab', 'Gfi-1']
:param logic_path: path to a file of logial expressions
:type logic_path: str
:param external_nodes_path: a path to a file of external nodes
:type external_nodes_path: str
:returns: a :class:`LogicNetwork`
"""
names = []
expressions = []
with open(logic_path) as eq_file:
for eq in eq_file:
name, expr = eq.split('=')
names.append(name.strip())
expressions.append(expr.strip())
if external_nodes_path:
with open(external_nodes_path) as extra_file:
extras = [name.strip() for name in extra_file]
names += extras
ops = {'AND', 'OR', 'NOT'}
table = []
for expr in expressions:
sub_nodes = []
conditions = set()
expr_split = expr.split()
for i, item in enumerate(expr_split):
if item not in ops and item not in '()':
if item not in names:
raise ValueError("unknown component '{}'".format(item))
if item not in sub_nodes:
expr_split[i] = '{' + str(len(sub_nodes)) + '}'
sub_nodes.append(item)
else:
expr_split[i] = '{' + str(sub_nodes.index(item)) + '}'
else:
expr_split[i] = item.lower()
logic_expr = ' '.join(expr_split)
indices = tuple([names.index(node) for node in sub_nodes])
for dec_state in range(2**len(sub_nodes)):
bin_state = '{0:0{1}b}'.format(dec_state, len(sub_nodes))
if eval(logic_expr.format(*bin_state)):
conditions.add(bin_state)
table.append((indices, conditions))
# Add empty logic tables for external components.
if external_nodes_path:
for i in range(len(extras)):
table.append((((len(names) - len(extras) + i),), set('1')))
return cls(table, names, reduced)
[docs] def neighbors_in(self, index):
"""
Return the set of all neighbor nodes, where edge(neighbor_node-->index)
exists.
.. rubric:: Examples
.. doctest:: logicnetwork
>>> net = LogicNetwork([((1, 2), {'11', '10'}),
... ((0,), {'1'}),
... ((0, 1, 2), {'010', '011', '101'}),
... ((3,), {'1'})])
>>> [net.neighbors_in(node) for node in range(net.size)]
[{1, 2}, {0}, {0, 1, 2}, {3}]
:param index: node index
:returns: the set of all node indices which point toward the index node
"""
return set(self.table[index][0])
[docs] def neighbors_out(self, index):
"""
Return the set of all neighbor nodes, where edge(index-->neighbor_node)
exists.
.. rubric:: Examples
.. doctest:: logicnetwork
>>> net = LogicNetwork([((1, 2), {'11', '10'}),
... ((0,), {'1'}),
... ((0, 1, 2), {'010', '011', '101'}),
... ((3,), {'1'})])
>>> [net.neighbors_out(node) for node in range(net.size)]
[{1, 2}, {0, 2}, {0, 2}, {3}]
:param index: node index
:returns: the set of all node indices which the index node points to
"""
outgoing_neighbors = set()
for i, incoming_neighbors in enumerate([row[0] for row in self.table]):
if index in incoming_neighbors:
outgoing_neighbors.add(i)
return outgoing_neighbors
[docs] def neighbors(self, index):
"""
Return a set of neighbors for a specified node, or a list of sets of
neighbors for all nodes in the network.
.. rubric:: Examples
.. doctest:: logicnetwork
>>> net = LogicNetwork([((1, 2), {'11', '10'}),
... ((0,), {'1'}),
... ((0, 1, 2), {'010', '011', '101'}),
... ((3,), {'1'})])
>>> [net.neighbors(node) for node in range(net.size)]
[{1, 2}, {0, 2}, {0, 1, 2}, {3}]
:param index: node index
:returns: a set of neighbors of a node
"""
return self.neighbors_in(index) | self.neighbors_out(index)
[docs] def to_networkx_graph(self, labels='indices'):
"""
Return networkx graph given neet network.
:param labels: how node is labeled and thus identified in networkx
graph (``'names'`` or ``'indices'``)
:returns: a ``networkx.DiGraph``
"""
if labels == 'names':
if hasattr(self, 'names') and (self.names is not None):
labels = self.names
else:
raise ValueError("network nodes do not have names")
elif labels == 'indices':
labels = range(self.__size)
else:
raise ValueError("labels must be 'names' or 'indices'")
edges = []
for i, label in enumerate(labels):
for j in self.neighbors_out(i):
edges.append((labels[i], labels[j]))
return nx.DiGraph(edges, name=self.metadata.get('name'))
[docs] def draw(self, labels='indices', filename=None):
"""
Output a file with a simple network drawing.
Requires ``networkx`` and ``pygraphviz``.
Supported image formats are determined by ``graphviz``. In particular,
pdf support requires ``'cairo'`` and ``'pango'`` to be installed prior
to graphviz installation.
:param labels: how node is labeled and thus identified in networkx
graph (``'names'`` or ``'indices'``), only used if
network is a :class:`LogicNetwork` or
:class:`neet.boolean.WTNetwork`
:param filename: filename to write drawing to. Temporary filename will
be used if no filename provided.
:returns: a ``pygraphviz`` network drawing
"""
nx.nx_agraph.view_pygraphviz(self.to_networkx_graph(
labels=labels), prog='circo', path=filename)