The Source Code
https://chrieke.medium.com/documenting-a-python-package-with-code-reference-via-mkdocs-material-b4a45197f95b
The Core-Module
connectable
Connectable
from_dict(self, json, safe=False)
Load class from dictionary.
Source code in msys/core/connectable.py
def from_dict(self, json: dict, safe=False) -> bool:
pass
to_dict(self)
Gerenate dict from class.
Source code in msys/core/connectable.py
def to_dict(self) -> dict:
res = super().to_dict()
res["type"] = self.ctype.to_dict()
if self.input:
res["ingoing"] = self.input().id
if self.outputs:
res["outpoing"] = [o().id for o in self.outputs]
return res
ConnectableFlag
An enumeration.
connection
Connection
find_connection(root, obj0, obj1)
staticmethod
Oberride this function
Returns:
| Type | Description |
|---|---|
[] |
[closest_parent, input, output] |
Source code in msys/core/connection.py
@staticmethod
def find_connection(root:Unit, obj0, obj1) -> []:
"""
Oberride this function
Returns:
[closest_parent, input, output]
"""
if isinstance(obj0, list):
obj0 = root.find(obj0)
if isinstance(obj1, list):
obj1 = root.find(obj1)
if not (issubclass(obj0.__class__,
ConnectableInterface) and issubclass(obj1.__class__,
ConnectableInterface)):
return []
id0 = obj0.complete_id()
id1 = obj1.complete_id()
len_diff = len(id1) - len(id0)
if abs(len_diff) > 1:
return []
if len_diff < 0:
obj1, obj0 = obj0, obj1
id0 = obj0.complete_id()
id1 = obj1.complete_id()
# obj0 is higher or level with obj1
# number of same levels
same_till = 0
for i in range(len(id0)):
if id0[i] != id1[i]:
break
same_till += 1
# no same root
if same_till == 0:
return []
# same level but different branches
if len(id1) - same_till > 2:
return []
parent_id = id0[:same_till]
# determine which one is input and which one is output
def get_type(p_id, obj):
po_diff = len(obj.complete_id()) - len(p_id)
if po_diff == 1:
return obj.get_local()
elif po_diff == 2:
return obj.get_global()
else:
return None
obj0_type = get_type(parent_id, obj0)
if not obj0_type:
return []
obj1_type = get_type(parent_id, obj1)
if not obj1_type:
return []
# cant connect if both have same type (i.e. Input-Input or Output-Output)
if obj0_type == obj1_type:
return []
input = None
output = None
if obj0_type == ConnectableFlag.INPUT:
input, output = obj0, obj1
else:
input, output = obj1, obj0
return [output, input, root.find(parent_id)]
from_dict(self, json, safe=False)
Load class from dictionary.
Source code in msys/core/connection.py
def from_dict(self, json: dict, safe=False) -> bool:
if not super().from_dict(json, safe):
return False
if self.out_ref() is None:
self.out_ref = weakref.ref(parent.find(json.keys()[0]))
elif json.dumps(self.out_ref().complete_id()) == json.keys()[0]:
self.out_ref = weakref.ref(parent.find(json.keys()[0]))
json = json.get(json.dumps(self.out_ref().complete_id()))
if not json:
return False
if self.in_ref() is None:
self.in_ref = weakref.ref(parent.find(json.keys()[0]))
elif json.dumps(self.in_ref().complete_id()) == json.keys()[0]:
self.in_ref = weakref.ref(parent.find(json.keys()[0]))
self.meta = json.get(json.dumps(self.in_ref().complete_id()))
return True
to_dict(self)
Gerenate dict from class.
Source code in msys/core/connection.py
def to_dict(self) -> dict:
input = self.in_ref()
output = self.out_ref()
if input is None or output is None:
del self
return None
return {json.dumps(output.complete_id()): {json.dumps(input.complete_id()): self.meta}}
interfaces
TypeInterface
get_value(self)
Must return representative value for connectivity-checks
Source code in msys/core/interfaces.py
@abstractmethod
def get_value(self):
"""
Must return representative value for connectivity-checks
"""
pass
is_changed(self)
Returns True if value has changed. Important for optimisation, for deciding whether to ignore or process a recipe.
Source code in msys/core/interfaces.py
@abstractmethod
def is_changed(self) -> bool:
"""
Returns True if value has changed.
Important for optimisation, for deciding whether to ignore or process a recipe.
"""
pass
UnitInterface
find(self, id, complete=True)
Returns the first element if it can find the element based on the identifier. If the identifier is complete (i.e. generated by the identifier()) the algorithm will be use a
Source code in msys/core/interfaces.py
@abstractmethod
def find(self, id: [], complete=True):
"""
Returns the first element if it can find the element based on the identifier.
If the identifier is complete (i.e. generated by the identifier()) the algorithm will be use a
"""
pass
find_all(self, id, complete=True)
Returns the elements if it can find the element based on the identifier.
Source code in msys/core/interfaces.py
@abstractmethod
def find_all(self, id: [], complete=True) -> []:
"""
Returns the elements if it can find the element based on the identifier.
"""
pass
metadata
Metadata
from_dict(self, json)
Load class from dictionary.
Source code in msys/core/metadata.py
def from_dict(self, json: dict) -> bool:
if "name" in json.keys():
self.name = json["name"]
if "color" in json.keys():
self.color = json["color"]
if "pos" in json.keys():
if not self.pos:
self.pos = Point()
self.pos.from_dict(json["pos"])
if "inverted" in json.keys():
self.inverted = json["inverted"]
return True
to_dict(self)
Gerenate dict from class.
Source code in msys/core/metadata.py
def to_dict(self) -> dict:
res = dict()
if self.name:
res.update({"name": self.name})
if self.color:
res.update({"color": self.color})
if self.pos:
res.update({"pos": self.pos.to_dict()})
if self.inverted:
res.update({"inverted": self.inverted})
return res
Point
from_dict(self, json)
Load class from dictionary.
Source code in msys/core/metadata.py
def from_dict(self, json: dict) -> bool:
if "x" in json.keys():
self.x = json["x"]
if "y" in json.keys():
self.y = json["y"]
return True
to_dict(self)
Gerenate dict from class.
Source code in msys/core/metadata.py
def to_dict(self) -> dict:
return self.__dict__
module
Module
from_dict(self, json, safe=False)
Load class from dictionary.
Source code in msys/core/module.py
def from_dict(self, json: dict, safe=False) -> bool:
def _from_dict(key, lists, changeable=False):
connectables = json[key]
for new_c in connectables:
if not "id" in new_c:
break
for old_c in lists:
if new_c["id"] == old_c.id:
old_c.from_dict(new_c, safe)
break
if safe and parent.is_protected():
return False
if "options" in json.keys():
_from_dict("options", self.options)
if not UUnit.from_dict(self, json, safe=False):
return False
Registrable.from_dict(self, json)
if "inputs" in json.keys():
self.inputs.from_dict(json["inputs"])
if "outputs" in json.keys():
self.outputs.from_dict(json["outputs"])
return True
is_tree(self)
Source code in msys/core/module.py
def is_tree(self) -> bool:
"""
"""
for i in range(len(self.modules)):
run = []
def move_from(module) -> bool:
run.append(module)
outputs = module.get_outputs()
for out in outputs:
for input in out.get_outgoing():
parent = input.parent
# prevent from going outside
if parent not in self.modules:
continue
# preventing double findings
if parent in run:
# if circle
if parent is self.modules[i]:
return False
continue
if not move_from(parent):
return False
return True
if not move_from(self.modules[i]):
return False
return True
process(self)
Overide this methode! Describs inner proccesing.
Source code in msys/core/module.py
def process(self) -> bool:
"""
Overide this methode!
Describs inner proccesing.
"""
def eval_tim_score(module, dtime):
if dtime <= 1:
dtime = 1
import math
module.time_score = round(math.log10(dtime))
def prioritize(priorities, complexity=0):
if complexity == 0:
return sorted(priorities, key=lambda module: (module.inputs.get_no_connected(),
-module.outputs.get_no_connected()))
if complexity == 1:
# priority by:
# lower connected inputs
# then lower time score
# then lowest change counter: connected inputs - input changes
# then highest changes
# then highest output connections
return sorted(priorities, key=lambda module: (
module.inputs.get_no_connected(),
module.time_score,
module.inputs.get_no_connected() - module.inputs.get_no_changed(),
-module.inputs.get_no_changed(),
module.outputs.get_no_connected()))
return []
self.modules = prioritize(self.modules)
if self.is_tree():
for m in self.modules:
m.update()
return True
# solve complex graph
iteration = 0
max_iterations = 500
start_from = 0
group_changed = True
priority_list = self.modules.copy()
while group_changed:
if iteration > max_iterations:
break
++ iteration
group_changed = False
for m_id in range(start_from, len(priority_list)):
module = priority_list[m_id]
import time
start = time.time()
changed = module.update()
delta_time = time.time() - start
eval_tim_score(module, delta_time)
if changed:
group_changed = True
# ignore modules which can change only once in the future
if module.inputs.get_no_connected() == 0:
start_from = m_id
if not (m_id + 1 < len(self.modules)):
continue
# TODO ignore if connected inputs are 0?
if priority_list[m_id].inputs.get_no_connected() == priority_list[m_id + 1].inputs.get_no_connected():
# register inflicted changes
for m in priority_list:
m.inputs.update_numbers()
priority_list = priority_list[:start_from] + prioritize(priority_list[start_from:], 1)
return True
to_dict(self)
Gerenate dict from class.
Source code in msys/core/module.py
def to_dict(self) -> dict:
res = UUnit.to_dict(self)
res.update(Registrable.to_dict(self))
if self.options:
options = []
for o in self.options:
options.append(o.to_dict())
res["options"] = options
if self.inputs[:]:
res["inputs"] = self.inputs.to_dict()
if self.outputs[:]:
res["outputs"] = self.outputs.to_dict()
if self.modules:
res["modules"] = []
for module in self.modules:
res["modules"].append(module.to_dict())
if self.connections:
res["connections"] = {}
for c in self.connections:
json = c.to_dict()
if json is None:
continue
key = list(json.keys())[0]
json.values
if key in res["connections"].keys():
res["connections"][key].update(list(json.values())[0])
else:
res["connections"].update(json)
return res
option
Option
from_dict(self, json, safe=False)
Load class from dictionary.
Source code in msys/core/option.py
def from_dict(self, json: dict, safe=False) -> bool:
if not super().from_dict(json, safe):
return False
if "id" in json.keys():
self.id = json["id"]
if "title" in json.keys():
self.title = json["title"]
value = None
if "value" in json.keys():
value = json["value"]
if "description" in json.keys():
self.description = json["description"]
if includes(json.keys(), ["selection", "single"]):
self.selection = json["selection"]
self.single = json["single"]
if not includes(self.selection, value):
return False
if len(value) != 1 and self.single:
return False
self.value = value
return True
to_dict(self)
Gerenate dict from class.
Source code in msys/core/option.py
def to_dict(self) -> dict:
res = super().to_dict()
if self.id:
res.update({"id": self.id})
if self.title:
res.update({"title": self.title})
if self.value:
res.update({"value": self.value})
if self.description:
res.update({"description": self.description})
if self.selection:
res.update({"selection": self.selection, "single": self.single})
return res
registrable
Registrable
from_dict(self, json, safe=False)
Load class from dictionary.
Source code in msys/core/registrable.py
def from_dict(self, json: dict, safe=False) -> bool:
if "registered" in json.keys():
res = json["registered"]
if "name" in res.keys():
self.registered_name = res["name"]
if "package" in res.keys():
self.registered_package = res["package"]
return True
return False
to_dict(self)
Gerenate dict from class.
Source code in msys/core/registrable.py
def to_dict(self) -> dict:
res = dict()
if self.registered_name:
res["name"] = self.registered_name
if self.registered_package:
res["package"] = self.registered_package
if res:
return {"registered": res}
else:
return {}
serializer
Serializer
from_dict(self, json, safe=False)
Load class from dictionary.
Source code in msys/core/serializer.py
def from_dict(self, json: dict, safe=False) -> bool:
if safe:
if self.protected:
return False
if "protected" in json.keys():
self.protected = json["protected"]
return True
to_dict(self)
Gerenate dict from class.
Source code in msys/core/serializer.py
def to_dict(self) -> dict:
res = dict()
if self.protected:
res["protected"] = self.protected
return res
SerializerInterface
from_dict(self, json, safe=False)
Load class from dictionary.
Source code in msys/core/serializer.py
@abstractmethod
def from_dict(self, json: dict, safe=False) -> bool:
"""Load class from dictionary."""
pass
to_dict(self)
Gerenate dict from class.
Source code in msys/core/serializer.py
@abstractmethod
def to_dict(self) -> dict:
"""Gerenate dict from class."""
pass
serializer_collections
SerializableList
from_dict(self, json, safe=False)
Load class from dictionary.
Source code in msys/core/serializer_collections.py
def from_dict(self, json: dict, safe=False) -> bool:
success = True
for i in range(len(json)):
if not self.elems[i].from_dict(json[i], safe):
success = False
return success
to_dict(self)
Gerenate dict from class.
Source code in msys/core/serializer_collections.py
def to_dict(self) -> dict:
res = []
for e in self.elems:
res.append(e.to_dict())
return res
type
Type
from_dict(self, json, safe=False)
Load class from dictionary.
Source code in msys/core/type.py
def from_dict(self, json: dict, safe=False) -> bool:
if "mro" in json.keys():
self.mro = json["mro"]
if "value" not in json.keys():
return False
self.value = json["value"]
return True
get_value(self)
Must return representative value for connectivity-checks
Source code in msys/core/type.py
def get_value(self):
return self.value
is_changed(self)
Returns True if value has changed. Important for optimisation, for deciding whether to ignore or process a recipe.
Source code in msys/core/type.py
def is_changed(self) -> bool:
return self.changed
to_dict(self)
Gerenate dict from class.
Source code in msys/core/type.py
def to_dict(self) -> dict:
res = super().to_dict()
res["value"] = self.value
if self.mro:
res["mro"] = self.mro
return res
unit
Unit
find(self, id, complete=True)
Returns the first element if it can find the element based on the identifier. If the identifier is complete (i.e. generated by the identifier()) the algorithm will be use a
Source code in msys/core/unit.py
def find(self, id: [], complete=True):
if complete:
length = len(id)
def _find(elem: UnitInterface, index: int):
tmp = elem.complete_id()
if index >= 0:
if index < length:
if not id[index] == elem.own_id():
return None
else:
if elem.own_id() in id:
index = id.index(elem.own_id())
if index < 0:
return None
if index == length - 1:
return elem
if index < length - 1:
pos = index + 1
for child in elem.get_childs():
res = _find(child, pos)
if res:
return res
return None
return _find(self, -1)
else:
raise NotImplementedError
find_all(self, id, complete=True)
Returns the elements if it can find the element based on the identifier.
Source code in msys/core/unit.py
def find_all(self, id: [], complete=True) -> []:
result = []
if complete:
length = len(id)
def _find(elem: UnitInterface, index: int):
if index >= 0:
if index < length:
if not id[index] == elem.own_id():
return None
else:
if elem.own_id() in id:
index = id.index(elem.own_id())
if index < 0:
return None
if index == length - 1:
result.append(elem)
return elem
if index < length - 1:
pos = index + 1
for child in elem.get_childs():
_find(child, pos)
return None
_find(self, -1)
else:
raise NotImplementedError
return result
from_dict(self, json, safe=False)
Load class from dictionary.
Source code in msys/core/unit.py
def from_dict(self, json: dict, safe=False) -> bool:
if not super().from_dict(json, safe):
return False
if "id" in json.keys():
self.id = json["id"]
if "metadata" in json.keys():
if not self.metadata:
self.metadata = Metadata()
self.metadata.from_dict(json["metadata"])
return True
to_dict(self)
Gerenate dict from class.
Source code in msys/core/unit.py
def to_dict(self) -> dict:
res = super().to_dict()
res["id"] = self.id
res["identifier"] = self.complete_id()
metadata = self.metadata.to_dict()
if metadata:
res["metadata"] = metadata
return res