Distances on Directed Graphs in R
at main 464 lines 15 kB view raw
1#include "run_sp.h" 2#include "pathfinders.h" 3#include "heaps/heap.h" 4 5#include <algorithm> // std::fill 6#include <fstream> // file output for parallel jobs 7#include <unordered_set> 8 9/************************************************************************* 10 * Direct implementation of 11 * "A Faster Algorithm for Betweenness Centrality", Ulrik Brandes (2001) 12 * Journal of Mathematical Sociology 25(2):163-177 13 * - same algorithm as used in igraph and networkx 14 *************************************************************************/ 15 16const double epsilon = DBL_MIN; // edge weight comparison == 0 17// see 18// https://github.com/igraph/igraph/blob/96c2cc751063bf3ba7e920e99793956013cef6b5/include/igraph_nongraph.h#L41 19// which defines epsilon as 1e-10. That value is passed to `igraph_cmp_epsilon`, 20// which is defined at 21// https://github.com/igraph/igraph/blob/96c2cc751063bf3ba7e920e99793956013cef6b5/src/math/utils.c#L108 22// and uses a comparison of fabs (diff) < (eps * DBL_MIN), 23// where DBL_MIN is the standard lower, non-zero limit 24// https://en.cppreference.com/w/cpp/types/numeric_limits/min 25// of generally around 1e-308, so (eps * DBL_MIN) is then sub-normal. This code 26// just uses an epsilon equal to DBL_MIN. 27 28// # nocov start 29template <typename T> 30void inst_graph (std::shared_ptr<DGraph> g, size_t nedges, 31 const std::map <std::string, size_t>& vert_map, 32 const std::vector <std::string>& from, 33 const std::vector <std::string>& to, 34 const std::vector <T>& dist, 35 const std::vector <T>& wt) 36{ 37 38 std::unordered_set < std::pair <size_t, size_t>, centrality::edge_pair_hash > edge_set; 39 40 for (size_t i = 0; i < nedges; ++i) 41 { 42 size_t fromi = vert_map.at(from [i]); 43 size_t toi = vert_map.at(to [i]); 44 45 std::pair <size_t, size_t> edge_pair {fromi, toi}; 46 if (edge_set.find (edge_pair) == edge_set.end ()) 47 { 48 edge_set.emplace (edge_pair); 49 g->addNewEdge (fromi, toi, dist [i], wt [i], i); 50 } 51 52 } 53} 54// # nocov end 55 56struct OneCentralityVert : public RcppParallel::Worker 57{ 58 size_t nverts; // can't be const because of reinterpret case 59 const std::string heap_type; 60 const std::vector <double> vert_wts; 61 const double dist_threshold; 62 std::shared_ptr <DGraph> g; 63 64 std::vector <double> output; 65 66 // Constructor 1: The main constructor 67 OneCentralityVert ( 68 const size_t nverts_in, 69 const std::string heap_type_in, 70 const std::vector <double> vert_wts_in, 71 const double dist_threshold_in, 72 const std::shared_ptr <DGraph> g_in) : 73 nverts (nverts_in), heap_type (heap_type_in), 74 vert_wts (vert_wts_in), dist_threshold (dist_threshold_in), 75 g (g_in), output () 76 { 77 output.resize (nverts, 0.0); 78 } 79 80 // Constructor 2: The Split constructor 81 OneCentralityVert ( 82 const OneCentralityVert &oneCentralityVert, 83 RcppParallel::Split) : 84 nverts (oneCentralityVert.nverts), 85 heap_type (oneCentralityVert.heap_type), 86 vert_wts (oneCentralityVert.vert_wts), 87 dist_threshold (oneCentralityVert.dist_threshold), 88 g (oneCentralityVert.g), output () 89 { 90 output.resize (nverts, 0.0); 91 } 92 93 94 // Parallel function operator 95 void operator() (size_t begin, size_t end) 96 { 97 std::shared_ptr<PF::PathFinder> pathfinder = 98 std::make_shared <PF::PathFinder> (nverts, 99 *run_sp::getHeapImpl (heap_type), g); 100 101 std::vector <double> cent (nverts, 0.0); 102 103 for (size_t v = begin; v < end; v++) 104 { 105 if (RcppThread::isInterrupted (v % static_cast<int>(100) == 0)) 106 return; 107 const double vwt = vert_wts [v]; 108 pathfinder->Centrality_vertex (cent, 109 static_cast <size_t> (v), 110 vwt, dist_threshold); 111 } 112 113 for (size_t i = 0; i < nverts; i++) 114 output [i] += cent [i]; 115 } 116 117 void join (const OneCentralityVert &rhs) 118 { 119 for (size_t i = 0; i < output.size (); i++) 120 output [i] += rhs.output [i]; 121 } 122}; 123 124struct OneCentralityEdge : public RcppParallel::Worker 125{ 126 size_t nverts; // can't be const because of reinterpret case 127 size_t nedges; 128 const std::string heap_type; 129 const std::vector <double> vert_wts; 130 const double dist_threshold; 131 std::shared_ptr <DGraph> g; 132 133 std::vector <double> output; 134 135 // Constructor 1: The main constructor 136 OneCentralityEdge ( 137 const size_t nverts_in, 138 const size_t nedges_in, 139 const std::string heap_type_in, 140 const std::vector <double> vert_wts_in, 141 const double dist_threshold_in, 142 const std::shared_ptr <DGraph> g_in) : 143 nverts (nverts_in), nedges (nedges_in), 144 heap_type (heap_type_in), vert_wts (vert_wts_in), 145 dist_threshold (dist_threshold_in), 146 g (g_in), output () 147 { 148 output.resize (nedges, 0.0); 149 } 150 151 // Constructor 2: The Split constructor 152 OneCentralityEdge ( 153 const OneCentralityEdge &oneCentralityEdge, 154 RcppParallel::Split) : 155 nverts (oneCentralityEdge.nverts), 156 nedges (oneCentralityEdge.nedges), 157 heap_type (oneCentralityEdge.heap_type), 158 vert_wts (oneCentralityEdge.vert_wts), 159 dist_threshold (oneCentralityEdge.dist_threshold), 160 g (oneCentralityEdge.g), 161 output () 162 { 163 output.resize (nedges, 0.0); 164 } 165 166 // Parallel function operator 167 void operator() (size_t begin, size_t end) 168 { 169 std::shared_ptr<PF::PathFinder> pathfinder = 170 std::make_shared <PF::PathFinder> (nverts, 171 *run_sp::getHeapImpl (heap_type), g); 172 173 std::vector <double> cent (nedges, 0.0); 174 175 for (size_t v = begin; v < end; v++) 176 { 177 if (RcppThread::isInterrupted (v % static_cast<int>(100) == 0)) 178 return; 179 const double vwt = vert_wts [v]; 180 pathfinder->Centrality_edge (cent, v, vwt, nedges, dist_threshold); 181 } 182 for (size_t i = 0; i < nedges; i++) 183 output [i] += cent [i]; 184 } 185 186 void join (const OneCentralityEdge &rhs) 187 { 188 for (size_t i = 0; i < output.size (); i++) 189 output [i] += rhs.output [i]; 190 } 191}; 192 193void PF::PathFinder::Centrality_vertex ( 194 std::vector <double>& cent, 195 const size_t s, 196 const double vert_wt, 197 const double dist_threshold) 198{ 199 const DGraphEdge *edge; 200 201 const size_t n = m_graph->nVertices(); 202 const std::vector <DGraphVertex>& vertices = m_graph->vertices(); 203 204 std::deque <size_t> v_stack; 205 206 std::vector <double> w (n, 0.0); 207 w [s] = 1.0; 208 209 m_heap->insert (s, -1.0); 210 211 // sigma as long int because graphs can be bigger than INT_MAX 212 std::vector <long int> sigma (n, 0); 213 sigma [s] = 1L; 214 215 std::vector <std::vector <size_t> > prev_vert (n); 216 217 while (m_heap->nItems() > 0) { 218 size_t v = m_heap->deleteMin(); 219 220 if (w [v] > dist_threshold) 221 continue; 222 223 v_stack.push_back (v); 224 225 edge = vertices [v].outHead; 226 227 // NOTE: This 'target_set', and the one below in 'centrality_edge', are 228 // only needed for graphs which are submitted with (potentially) 229 // duplicate edges. The set is used to ensure centrality values are only 230 // aggregated once along any set of duplicated edges. The effect of not 231 // doing this is documented at 232 // https://github.com/ATFutures/dodgr/issues/186. 233 // 234 // Nevertheless, the other commits flagged in that issue add a function 235 // to the internal 'inst_graph' function at the top of this file ensures 236 // no duplicated edges are added, so this 'target_set' is not actually 237 // needed. The issue shows that it does not decrease computational 238 // efficiency here, so it's left for the moment, but can easily be 239 // removed later. 240 std::unordered_set <size_t> target_set; 241 242 while (edge) { 243 244 size_t et = edge->target; 245 double wt = w [v] + edge->wt; 246 247 if (target_set.find (et) == target_set.end ()) 248 { 249 target_set.emplace (et); 250 251 if (w [et] == 0.0) // first connection to et 252 { 253 prev_vert [et] = std::vector <size_t> (1L, v); 254 255 sigma [et] = sigma [v]; 256 w [et] = wt; 257 m_heap->insert (et, wt); 258 } else if (wt < w [et]) 259 { 260 prev_vert [et] = std::vector <size_t> (1L, v); 261 262 sigma [et] = sigma [v]; 263 w [et] = wt; 264 m_heap->decreaseKey (et, wt); 265 } else if (fabs (wt - w [et]) < epsilon) 266 { 267 std::vector <size_t> vert_vec = prev_vert [et]; 268 vert_vec.push_back (v); 269 prev_vert [et] = vert_vec; 270 271 sigma [et] += sigma [v]; 272 } 273 } 274 275 edge = edge->nextOut; 276 } 277 } // end while nItems > 0 278 279 // Then read from the stack and count centrality paths 280 std::vector <double> delta (n, 0.0); 281 while (!v_stack.empty ()) 282 { 283 const size_t v = v_stack.back (); 284 v_stack.pop_back (); 285 std::vector <size_t> vert_vec = prev_vert [v]; 286 double tempd = (1.0 + delta [v]) / static_cast <double> (sigma [v]); 287 for (auto ws: vert_vec) 288 { 289 delta [ws] += static_cast <double> (sigma [ws]) * tempd; 290 } 291 if (v != s) 292 cent [v] += vert_wt * delta [v]; 293 } 294} 295 296 297void PF::PathFinder::Centrality_edge ( 298 std::vector <double>& cent, 299 const size_t s, 300 const double vert_wt, 301 const size_t nedges, 302 const double dist_threshold) 303{ 304 const DGraphEdge *edge; 305 306 const size_t n = m_graph->nVertices(); 307 const std::vector <DGraphVertex>& vertices = m_graph->vertices(); 308 309 std::deque <size_t> v_stack; 310 311 std::vector <double> w (n, 0.0); 312 w [s] = 1.0; 313 314 m_heap->insert (s, -1.0); 315 316 std::vector <long int> sigma (n, 0); 317 sigma [s] = 1L; 318 std::vector <long int> sigma_edge (nedges, 0); 319 320 std::vector <std::vector <size_t> > prev_vert (n), prev_edge (n); 321 322 while (m_heap->nItems() > 0) { 323 324 size_t v = m_heap->deleteMin(); 325 326 if (w [v] > dist_threshold) 327 continue; 328 329 v_stack.push_back (v); 330 331 edge = vertices [v].outHead; 332 333 // See comment in 'centrality_vertex', above, about this 'target_set'. 334 std::unordered_set <size_t> target_set; 335 336 while (edge) { 337 338 size_t et = edge->target; 339 double wt = w [v] + edge->wt; 340 341 if (target_set.find (et) == target_set.end ()) 342 { 343 target_set.emplace (et); 344 345 // DGraph has no edge iterator, so prev_edge elements contains 346 // pairwise elements of [from vertex, edge_id] 347 348 if (w [et] == 0.0) // first connection to et 349 { 350 prev_edge [et] = std::vector <size_t> {v, edge->edge_id}; 351 352 sigma [et] = sigma [v]; 353 sigma_edge [edge->edge_id] = sigma [v]; 354 355 w [et] = wt; 356 m_heap->insert (et, wt); 357 } else if (wt < w [et]) 358 { 359 prev_edge [et] = std::vector <size_t> {v, edge->edge_id}; 360 361 sigma [et] = sigma [v]; 362 sigma_edge [edge->edge_id] = sigma [v]; 363 364 w [et] = wt; 365 m_heap->decreaseKey (et, wt); 366 } else if (fabs (wt - w [et]) < epsilon) 367 { 368 std::vector <size_t> edge_vec = prev_edge [et]; 369 edge_vec.push_back (v); 370 edge_vec.push_back (edge->edge_id); 371 prev_edge [et] = edge_vec; 372 373 sigma [et] += sigma [v]; 374 sigma_edge [edge->edge_id] += sigma [v]; 375 } 376 } 377 378 edge = edge->nextOut; 379 } 380 } // end while nItems > 0 381 382 // Then read from the stack and count centrality paths 383 std::vector <double> delta (n, 0.0); 384 while (!v_stack.empty ()) 385 { 386 const size_t v = v_stack.back (); 387 v_stack.pop_back (); 388 std::vector <size_t> edge_vec = prev_edge [v]; 389 double tempd = (1.0 + delta [v]) / static_cast <double> (sigma [v]); 390 391 std::vector <size_t>::iterator it = edge_vec.begin (); 392 // The dereferenced edge_vec iterator is simply a direct index 393 while (it != edge_vec.end ()) 394 { 395 delta [*it] += static_cast <double> (sigma [*it]) * tempd; 396 it = std::next (it); 397 cent [*it] += static_cast <double> (sigma_edge [*it]) * tempd * vert_wt; 398 it = std::next (it); 399 } 400 } 401} 402 403 404//' rcpp_centrality - parallel function 405//' 406//' sample is used to estimate timing, by calculating centrality from just a few 407//' vertices. 408//' @noRd 409// [[Rcpp::export]] 410Rcpp::NumericVector rcpp_centrality (const Rcpp::DataFrame graph, 411 const Rcpp::DataFrame vert_map_in, 412 const std::string& heap_type, 413 const double dist_threshold, 414 const bool edge_centrality, 415 const int sample) 416{ 417 std::vector <std::string> from = graph ["from"]; 418 std::vector <std::string> to = graph ["to"]; 419 std::vector <double> dist = graph ["d"]; 420 std::vector <double> wt = graph ["d_weighted"]; 421 422 const size_t nedges = static_cast <size_t> (graph.nrow ()); 423 std::map <std::string, size_t> vert_map; 424 std::vector <std::string> vert_map_id = vert_map_in ["vert"]; 425 std::vector <size_t> vert_map_n = vert_map_in ["id"]; 426 size_t nverts = run_sp::make_vert_map (vert_map_in, vert_map_id, 427 vert_map_n, vert_map); 428 429 Rcpp::CharacterVector v_nms = vert_map_in.attr ("names"); 430 std::vector <double> vert_wts (static_cast <size_t> (vert_map_in.nrow ()), 1.0); 431 for (auto n: v_nms) { 432 if (n == "vert_wts") { 433 std::vector <double> tempd = vert_map_in ["vert_wts"]; 434 std::copy (tempd.begin (), tempd.end (), vert_wts.begin ()); 435 break; 436 } 437 } 438 439 std::shared_ptr <DGraph> g = std::make_shared <DGraph> (nverts); 440 inst_graph (g, nedges, vert_map, from, to, dist, wt); 441 442 size_t nverts_to_use = nverts; 443 if (sample > 0) 444 nverts_to_use = static_cast <size_t> (sample); 445 446 std::vector <double> result; 447 if (edge_centrality) 448 { 449 OneCentralityEdge one_centrality (nverts, nedges, heap_type, 450 vert_wts, dist_threshold, g); 451 452 RcppParallel::parallelReduce (0, nverts_to_use, one_centrality); 453 result = one_centrality.output; 454 } else // vertex centrality 455 { 456 OneCentralityVert one_centrality (nverts, heap_type, vert_wts, 457 dist_threshold, g); 458 459 RcppParallel::parallelReduce (0, nverts_to_use, one_centrality); 460 result = one_centrality.output; 461 } 462 463 return Rcpp::wrap (result); 464}