Clone of https://github.com/NixOS/nixpkgs.git (to stress-test knotserver)
1from collections.abc import Iterable 2from pathlib import Path 3from toolz import curried as tlz 4from toolz import curry 5import igraph as igraph 6import itertools as itertools 7import json as json 8import os as os 9import re as re 10import sys 11 12DEBUG = os.environ.get("DEBUG", False) == "True" 13DEBUG_PLOT = os.environ.get("DEBUG_PLOT", False) == "True" 14# If this is set, the plots will be saved to files instead of being displayed 15# with default image viewer. 16DEBUG_PLOT_SAVE_BASE_NAME = os.environ.get("DEBUG_PLOT_SAVE_BASE_NAME") 17 18c = igraph.configuration.init() 19# App used to open the plots when DEBUG_PLOT_SAVE_BASE_NAME is not set. 20c["apps.image_viewer"] = os.environ.get("DEBUG_PLOT_IMAGE_VIEWER", "gwenview") 21 22 23def debug(*args, **kwargs): 24 if DEBUG: 25 print(*args, file=sys.stderr, **kwargs) 26 27 28def debug_plot(graph, name, **kwargs): 29 if not DEBUG_PLOT: 30 return 31 32 vertex_label = [ 33 # remove /nix/store/HASH- prefix from labels 34 re.split("^/nix/store/[a-z0-9]{32}-", name)[-1] 35 for name in graph.vs["name"] 36 ] 37 38 save_as = ( 39 None if DEBUG_PLOT_SAVE_BASE_NAME is None 40 else DEBUG_PLOT_SAVE_BASE_NAME + name + ".png" 41 ) 42 43 igraph.plot( 44 graph, 45 save_as, 46 vertex_label=vertex_label, 47 **(tlz.merge( 48 { 49 # "bbox": (3840, 2160), 50 "bbox": (800, 600), 51 "margin": 100, 52 "vertex_label_dist": -5, 53 "edge_color": "orange", 54 "vertex_size": 20, 55 "vertex_label_size": 30, 56 "edge_arrow_size": 2 57 }, 58 kwargs 59 )), 60 ) 61 62 63def debug_plot_with_highligth(g, vs, layout): 64 debug_plot( 65 g, 66 layout=layout, 67 # layout=Layout(new_coords), 68 vertex_color=[ 69 "green" if v.index in vs else "red" 70 for v in g.vs 71 ] 72 ) 73 74 75@curry 76def pick_keys(keys, d): 77 return { 78 key: d[key] for key in keys if key in d 79 } 80 81 82def unnest_iterable(xs): 83 return itertools.chain.from_iterable(xs) 84 85 86def load_json(file_path): 87 with open(file_path) as f: 88 return json.load(f) 89 90 91@curry 92def sorted_by(key, xs): 93 return sorted(xs, key=lambda x: x[key]) 94 95 96@curry 97def find_vertex_by_name_or_none(graph, name): 98 try: 99 # NOTE: find by name is constant time. 100 return graph.vs.find(name) 101 # This will be thrown if vertex with given name is not found. 102 except ValueError: 103 return None 104 105 106def subcomponent_multi(graph, vertices, mode="out"): 107 """Return concatenated subcomponents generated by the given list of 108 vertices. 109 """ 110 return tlz.mapcat( 111 lambda vertex: graph.subcomponent(vertex, mode=mode), 112 vertices 113 ) 114 115 116@curry 117def edges_for_reference_graph_node(path_to_size_dict, reference_graph_node): 118 source = reference_graph_node["path"] 119 return map( 120 lambda x: {"source": source, "target": x}, 121 sorted( 122 filter( 123 # references might contain source 124 lambda x: x != source, 125 reference_graph_node["references"] 126 ), 127 key=lambda x: 1 * path_to_size_dict[x] 128 ) 129 ) 130 131 132reference_graph_node_keys_to_keep = [ 133 "closureSize", 134 "narSize" 135] 136 137pick_reference_graph_node_keys = pick_keys(reference_graph_node_keys_to_keep) 138 139 140def vertex_from_reference_graph_node(reference_graph_node): 141 return tlz.merge( 142 {"name": reference_graph_node["path"]}, 143 pick_reference_graph_node_keys(reference_graph_node) 144 ) 145 146 147def references_graph_to_igraph(references_graph): 148 """ 149 Converts result of exportReferencesGraph into an igraph directed graph. 150 Uses paths as igraph node names, and sets closureSize and narSize as 151 properties of igraph nodes. 152 """ 153 debug('references_graph', references_graph) 154 references_graph = sorted(references_graph, key=lambda x: 1 * x["narSize"]) 155 156 # Short circuit since DictList throws an error if first argument (vertices) 157 # contains no elements. 158 # The error is: KeyError: 'name' 159 # here: https://github.com/igraph/python-igraph/blob/da7484807f5152a2c18c55dd4154653de2c7f5f7/src/igraph/__init__.py#L3091 # noqa: E501 160 # This looks like a bug. 161 if len(references_graph) == 0: 162 return empty_directed_graph() 163 164 path_to_size_dict = { 165 node["path"]: node["narSize"] for node in references_graph 166 } 167 168 debug('path_to_size_dict', path_to_size_dict) 169 170 return igraph.Graph.DictList( 171 map(vertex_from_reference_graph_node, references_graph), 172 unnest_iterable(map( 173 edges_for_reference_graph_node(path_to_size_dict), 174 references_graph 175 )), 176 directed=True 177 ) 178 179 180@curry 181def graph_vertex_index_to_name(graph, index): 182 return graph.vs[index]["name"] 183 184 185def igraph_to_reference_graph(igraph_instance): 186 return [ 187 tlz.merge( 188 { 189 "path": v["name"], 190 "references": list(map( 191 graph_vertex_index_to_name(igraph_instance), 192 igraph_instance.successors(v.index) 193 )) 194 }, 195 pick_reference_graph_node_keys(v.attributes()) 196 ) 197 for v in igraph_instance.vs 198 ] 199 200 201def load_closure_graph(file_path): 202 return references_graph_to_igraph(load_json(file_path)) 203 204 205def path_relative_to_file(file_path_from, file_path): 206 dir_path = Path(file_path_from).parent 207 return dir_path / file_path 208 209 210def is_None(x): 211 return x is None 212 213 214def not_None(x): 215 return x is not None 216 217 218def print_layers(layers): 219 debug("\n::::LAYERS:::::") 220 for index, layer in enumerate(layers): 221 debug("") 222 debug("layer index:", index) 223 debug("[") 224 for v in layer.vs["name"]: 225 debug(" ", v) 226 debug("]") 227 228 229def print_vs(graph): 230 for v in graph.vs: 231 debug(v) 232 233 234def directed_graph(edges, vertices=None, vertex_attrs=[]): 235 graph = igraph.Graph.TupleList(edges, directed=True) 236 237 # Add detached vertices (without edges) if any. 238 if vertices is not None: 239 graph = graph + vertices 240 241 # Add vertex attributes if any. 242 for (name, attrs_dict) in vertex_attrs: 243 vertex = graph.vs.find(name) 244 245 for (k, v) in attrs_dict.items(): 246 vertex[k] = v 247 248 return graph 249 250 251def empty_directed_graph(): 252 return directed_graph([]) 253 254 255def graph_is_empty(graph): 256 return len(graph.vs) == 0 257 258 259def pick_attrs(attrs, x): 260 return {attr: getattr(x, attr) for attr in attrs} 261 262 263def merge_graphs(graphs): 264 return tlz.reduce(lambda acc, g: acc + g, graphs, empty_directed_graph()) 265 266 267# Functions below can be used in user defined pipeline (see pipe.py). 268# All functions need to be curried, and the user needs to be able to 269# provide values for all arguments apart from the last one from nix code. 270@curry 271def over(prop_name, func, dictionary): 272 value = dictionary[prop_name] 273 return tlz.assoc(dictionary, prop_name, func(value)) 274 275 276# One argument functions also need to be curried to simplify processing of the 277# pipeline. 278@curry 279def flatten(xs): 280 xs = xs.values() if isinstance(xs, dict) else xs 281 for x in xs: 282 if isinstance(x, Iterable) and not isinstance(x, (str, bytes)): 283 yield from flatten(x) 284 else: 285 yield x 286 287 288@curry 289def split_every(count, graph): 290 vs = graph.vs 291 return [ 292 graph.induced_subgraph(vs[x:x + count]) 293 for x in range(0, len(vs), count) 294 ] 295 296 297@curry 298def limit_layers(max_count, graphs): 299 assert max_count > 0, "max count needs to > 0" 300 301 graphs_iterator = iter(graphs) 302 303 return tlz.concat([ 304 tlz.take(max_count - 1, graphs_iterator), 305 # Merges all graphs remaining in the iterator, after initial 306 # max_count - 1 have been taken. 307 (lambda: (yield merge_graphs(graphs_iterator)))() 308 ]) 309 310 311@curry 312def remove_paths(paths, graph): 313 # Allow passing a single path. 314 if isinstance(paths, str): 315 paths = [paths] 316 317 indices_to_remove = tlz.compose( 318 list, 319 tlz.map(lambda v: v.index), 320 tlz.remove(is_None), 321 tlz.map(find_vertex_by_name_or_none(graph)) 322 )(paths) 323 324 return graph - indices_to_remove if len(indices_to_remove) > 0 else graph 325 326 327@curry 328def reverse(iterator): 329 return reversed(list(iterator))