1from collections.abc import Iterable
2from pathlib import Path
3from toolz import curried as tlz
4from toolz import curry
5import igraph as igraph
6import itertools as itertools
7import json as json
8import os as os
9import re as re
10import sys
11
12DEBUG = os.environ.get("DEBUG", False) == "True"
13DEBUG_PLOT = os.environ.get("DEBUG_PLOT", False) == "True"
14# If this is set, the plots will be saved to files instead of being displayed
15# with default image viewer.
16DEBUG_PLOT_SAVE_BASE_NAME = os.environ.get("DEBUG_PLOT_SAVE_BASE_NAME")
17
18c = igraph.configuration.init()
19# App used to open the plots when DEBUG_PLOT_SAVE_BASE_NAME is not set.
20c["apps.image_viewer"] = os.environ.get("DEBUG_PLOT_IMAGE_VIEWER", "gwenview")
21
22
23def debug(*args, **kwargs):
24 if DEBUG:
25 print(*args, file=sys.stderr, **kwargs)
26
27
28def debug_plot(graph, name, **kwargs):
29 if not DEBUG_PLOT:
30 return
31
32 vertex_label = [
33 # remove /nix/store/HASH- prefix from labels
34 re.split("^/nix/store/[a-z0-9]{32}-", name)[-1]
35 for name in graph.vs["name"]
36 ]
37
38 save_as = (
39 None if DEBUG_PLOT_SAVE_BASE_NAME is None
40 else DEBUG_PLOT_SAVE_BASE_NAME + name + ".png"
41 )
42
43 igraph.plot(
44 graph,
45 save_as,
46 vertex_label=vertex_label,
47 **(tlz.merge(
48 {
49 # "bbox": (3840, 2160),
50 "bbox": (800, 600),
51 "margin": 100,
52 "vertex_label_dist": -5,
53 "edge_color": "orange",
54 "vertex_size": 20,
55 "vertex_label_size": 30,
56 "edge_arrow_size": 2
57 },
58 kwargs
59 )),
60 )
61
62
63def debug_plot_with_highligth(g, vs, layout):
64 debug_plot(
65 g,
66 layout=layout,
67 # layout=Layout(new_coords),
68 vertex_color=[
69 "green" if v.index in vs else "red"
70 for v in g.vs
71 ]
72 )
73
74
75@curry
76def pick_keys(keys, d):
77 return {
78 key: d[key] for key in keys if key in d
79 }
80
81
82def unnest_iterable(xs):
83 return itertools.chain.from_iterable(xs)
84
85
86def load_json(file_path):
87 with open(file_path) as f:
88 return json.load(f)
89
90
91@curry
92def sorted_by(key, xs):
93 return sorted(xs, key=lambda x: x[key])
94
95
96@curry
97def find_vertex_by_name_or_none(graph, name):
98 try:
99 # NOTE: find by name is constant time.
100 return graph.vs.find(name)
101 # This will be thrown if vertex with given name is not found.
102 except ValueError:
103 return None
104
105
106def subcomponent_multi(graph, vertices, mode="out"):
107 """Return concatenated subcomponents generated by the given list of
108 vertices.
109 """
110 return tlz.mapcat(
111 lambda vertex: graph.subcomponent(vertex, mode=mode),
112 vertices
113 )
114
115
116@curry
117def edges_for_reference_graph_node(path_to_size_dict, reference_graph_node):
118 source = reference_graph_node["path"]
119 return map(
120 lambda x: {"source": source, "target": x},
121 sorted(
122 filter(
123 # references might contain source
124 lambda x: x != source,
125 reference_graph_node["references"]
126 ),
127 key=lambda x: 1 * path_to_size_dict[x]
128 )
129 )
130
131
132reference_graph_node_keys_to_keep = [
133 "closureSize",
134 "narSize"
135]
136
137pick_reference_graph_node_keys = pick_keys(reference_graph_node_keys_to_keep)
138
139
140def vertex_from_reference_graph_node(reference_graph_node):
141 return tlz.merge(
142 {"name": reference_graph_node["path"]},
143 pick_reference_graph_node_keys(reference_graph_node)
144 )
145
146
147def references_graph_to_igraph(references_graph):
148 """
149 Converts result of exportReferencesGraph into an igraph directed graph.
150 Uses paths as igraph node names, and sets closureSize and narSize as
151 properties of igraph nodes.
152 """
153 debug('references_graph', references_graph)
154 references_graph = sorted(references_graph, key=lambda x: 1 * x["narSize"])
155
156 # Short circuit since DictList throws an error if first argument (vertices)
157 # contains no elements.
158 # The error is: KeyError: 'name'
159 # here: https://github.com/igraph/python-igraph/blob/da7484807f5152a2c18c55dd4154653de2c7f5f7/src/igraph/__init__.py#L3091 # noqa: E501
160 # This looks like a bug.
161 if len(references_graph) == 0:
162 return empty_directed_graph()
163
164 path_to_size_dict = {
165 node["path"]: node["narSize"] for node in references_graph
166 }
167
168 debug('path_to_size_dict', path_to_size_dict)
169
170 return igraph.Graph.DictList(
171 map(vertex_from_reference_graph_node, references_graph),
172 unnest_iterable(map(
173 edges_for_reference_graph_node(path_to_size_dict),
174 references_graph
175 )),
176 directed=True
177 )
178
179
180@curry
181def graph_vertex_index_to_name(graph, index):
182 return graph.vs[index]["name"]
183
184
185def igraph_to_reference_graph(igraph_instance):
186 return [
187 tlz.merge(
188 {
189 "path": v["name"],
190 "references": list(map(
191 graph_vertex_index_to_name(igraph_instance),
192 igraph_instance.successors(v.index)
193 ))
194 },
195 pick_reference_graph_node_keys(v.attributes())
196 )
197 for v in igraph_instance.vs
198 ]
199
200
201def load_closure_graph(file_path):
202 return references_graph_to_igraph(load_json(file_path))
203
204
205def path_relative_to_file(file_path_from, file_path):
206 dir_path = Path(file_path_from).parent
207 return dir_path / file_path
208
209
210def is_None(x):
211 return x is None
212
213
214def not_None(x):
215 return x is not None
216
217
218def print_layers(layers):
219 debug("\n::::LAYERS:::::")
220 for index, layer in enumerate(layers):
221 debug("")
222 debug("layer index:", index)
223 debug("[")
224 for v in layer.vs["name"]:
225 debug(" ", v)
226 debug("]")
227
228
229def print_vs(graph):
230 for v in graph.vs:
231 debug(v)
232
233
234def directed_graph(edges, vertices=None, vertex_attrs=[]):
235 graph = igraph.Graph.TupleList(edges, directed=True)
236
237 # Add detached vertices (without edges) if any.
238 if vertices is not None:
239 graph = graph + vertices
240
241 # Add vertex attributes if any.
242 for (name, attrs_dict) in vertex_attrs:
243 vertex = graph.vs.find(name)
244
245 for (k, v) in attrs_dict.items():
246 vertex[k] = v
247
248 return graph
249
250
251def empty_directed_graph():
252 return directed_graph([])
253
254
255def graph_is_empty(graph):
256 return len(graph.vs) == 0
257
258
259def pick_attrs(attrs, x):
260 return {attr: getattr(x, attr) for attr in attrs}
261
262
263def merge_graphs(graphs):
264 return tlz.reduce(lambda acc, g: acc + g, graphs, empty_directed_graph())
265
266
267# Functions below can be used in user defined pipeline (see pipe.py).
268# All functions need to be curried, and the user needs to be able to
269# provide values for all arguments apart from the last one from nix code.
270@curry
271def over(prop_name, func, dictionary):
272 value = dictionary[prop_name]
273 return tlz.assoc(dictionary, prop_name, func(value))
274
275
276# One argument functions also need to be curried to simplify processing of the
277# pipeline.
278@curry
279def flatten(xs):
280 xs = xs.values() if isinstance(xs, dict) else xs
281 for x in xs:
282 if isinstance(x, Iterable) and not isinstance(x, (str, bytes)):
283 yield from flatten(x)
284 else:
285 yield x
286
287
288@curry
289def split_every(count, graph):
290 vs = graph.vs
291 return [
292 graph.induced_subgraph(vs[x:x + count])
293 for x in range(0, len(vs), count)
294 ]
295
296
297@curry
298def limit_layers(max_count, graphs):
299 assert max_count > 0, "max count needs to > 0"
300
301 graphs_iterator = iter(graphs)
302
303 return tlz.concat([
304 tlz.take(max_count - 1, graphs_iterator),
305 # Merges all graphs remaining in the iterator, after initial
306 # max_count - 1 have been taken.
307 (lambda: (yield merge_graphs(graphs_iterator)))()
308 ])
309
310
311@curry
312def remove_paths(paths, graph):
313 # Allow passing a single path.
314 if isinstance(paths, str):
315 paths = [paths]
316
317 indices_to_remove = tlz.compose(
318 list,
319 tlz.map(lambda v: v.index),
320 tlz.remove(is_None),
321 tlz.map(find_vertex_by_name_or_none(graph))
322 )(paths)
323
324 return graph - indices_to_remove if len(indices_to_remove) > 0 else graph
325
326
327@curry
328def reverse(iterator):
329 return reversed(list(iterator))