Distances on Directed Graphs in R
1#include "run_sp.h"
2#include "pathfinders.h"
3#include "heaps/heap.h"
4
5#include <algorithm> // std::fill
6#include <fstream> // file output for parallel jobs
7#include <unordered_set>
8
9/*************************************************************************
10 * Direct implementation of
11 * "A Faster Algorithm for Betweenness Centrality", Ulrik Brandes (2001)
12 * Journal of Mathematical Sociology 25(2):163-177
13 * - same algorithm as used in igraph and networkx
14 *************************************************************************/
15
16const double epsilon = DBL_MIN; // edge weight comparison == 0
17// see
18// https://github.com/igraph/igraph/blob/96c2cc751063bf3ba7e920e99793956013cef6b5/include/igraph_nongraph.h#L41
19// which defines epsilon as 1e-10. That value is passed to `igraph_cmp_epsilon`,
20// which is defined at
21// https://github.com/igraph/igraph/blob/96c2cc751063bf3ba7e920e99793956013cef6b5/src/math/utils.c#L108
22// and uses a comparison of fabs (diff) < (eps * DBL_MIN),
23// where DBL_MIN is the standard lower, non-zero limit
24// https://en.cppreference.com/w/cpp/types/numeric_limits/min
25// of generally around 1e-308, so (eps * DBL_MIN) is then sub-normal. This code
26// just uses an epsilon equal to DBL_MIN.
27
28// # nocov start
29template <typename T>
30void inst_graph (std::shared_ptr<DGraph> g, size_t nedges,
31 const std::map <std::string, size_t>& vert_map,
32 const std::vector <std::string>& from,
33 const std::vector <std::string>& to,
34 const std::vector <T>& dist,
35 const std::vector <T>& wt)
36{
37
38 std::unordered_set < std::pair <size_t, size_t>, centrality::edge_pair_hash > edge_set;
39
40 for (size_t i = 0; i < nedges; ++i)
41 {
42 size_t fromi = vert_map.at(from [i]);
43 size_t toi = vert_map.at(to [i]);
44
45 std::pair <size_t, size_t> edge_pair {fromi, toi};
46 if (edge_set.find (edge_pair) == edge_set.end ())
47 {
48 edge_set.emplace (edge_pair);
49 g->addNewEdge (fromi, toi, dist [i], wt [i], i);
50 }
51
52 }
53}
54// # nocov end
55
56struct OneCentralityVert : public RcppParallel::Worker
57{
58 size_t nverts; // can't be const because of reinterpret case
59 const std::string heap_type;
60 const std::vector <double> vert_wts;
61 const double dist_threshold;
62 std::shared_ptr <DGraph> g;
63
64 std::vector <double> output;
65
66 // Constructor 1: The main constructor
67 OneCentralityVert (
68 const size_t nverts_in,
69 const std::string heap_type_in,
70 const std::vector <double> vert_wts_in,
71 const double dist_threshold_in,
72 const std::shared_ptr <DGraph> g_in) :
73 nverts (nverts_in), heap_type (heap_type_in),
74 vert_wts (vert_wts_in), dist_threshold (dist_threshold_in),
75 g (g_in), output ()
76 {
77 output.resize (nverts, 0.0);
78 }
79
80 // Constructor 2: The Split constructor
81 OneCentralityVert (
82 const OneCentralityVert &oneCentralityVert,
83 RcppParallel::Split) :
84 nverts (oneCentralityVert.nverts),
85 heap_type (oneCentralityVert.heap_type),
86 vert_wts (oneCentralityVert.vert_wts),
87 dist_threshold (oneCentralityVert.dist_threshold),
88 g (oneCentralityVert.g), output ()
89 {
90 output.resize (nverts, 0.0);
91 }
92
93
94 // Parallel function operator
95 void operator() (size_t begin, size_t end)
96 {
97 std::shared_ptr<PF::PathFinder> pathfinder =
98 std::make_shared <PF::PathFinder> (nverts,
99 *run_sp::getHeapImpl (heap_type), g);
100
101 std::vector <double> cent (nverts, 0.0);
102
103 for (size_t v = begin; v < end; v++)
104 {
105 if (RcppThread::isInterrupted (v % static_cast<int>(100) == 0))
106 return;
107 const double vwt = vert_wts [v];
108 pathfinder->Centrality_vertex (cent,
109 static_cast <size_t> (v),
110 vwt, dist_threshold);
111 }
112
113 for (size_t i = 0; i < nverts; i++)
114 output [i] += cent [i];
115 }
116
117 void join (const OneCentralityVert &rhs)
118 {
119 for (size_t i = 0; i < output.size (); i++)
120 output [i] += rhs.output [i];
121 }
122};
123
124struct OneCentralityEdge : public RcppParallel::Worker
125{
126 size_t nverts; // can't be const because of reinterpret case
127 size_t nedges;
128 const std::string heap_type;
129 const std::vector <double> vert_wts;
130 const double dist_threshold;
131 std::shared_ptr <DGraph> g;
132
133 std::vector <double> output;
134
135 // Constructor 1: The main constructor
136 OneCentralityEdge (
137 const size_t nverts_in,
138 const size_t nedges_in,
139 const std::string heap_type_in,
140 const std::vector <double> vert_wts_in,
141 const double dist_threshold_in,
142 const std::shared_ptr <DGraph> g_in) :
143 nverts (nverts_in), nedges (nedges_in),
144 heap_type (heap_type_in), vert_wts (vert_wts_in),
145 dist_threshold (dist_threshold_in),
146 g (g_in), output ()
147 {
148 output.resize (nedges, 0.0);
149 }
150
151 // Constructor 2: The Split constructor
152 OneCentralityEdge (
153 const OneCentralityEdge &oneCentralityEdge,
154 RcppParallel::Split) :
155 nverts (oneCentralityEdge.nverts),
156 nedges (oneCentralityEdge.nedges),
157 heap_type (oneCentralityEdge.heap_type),
158 vert_wts (oneCentralityEdge.vert_wts),
159 dist_threshold (oneCentralityEdge.dist_threshold),
160 g (oneCentralityEdge.g),
161 output ()
162 {
163 output.resize (nedges, 0.0);
164 }
165
166 // Parallel function operator
167 void operator() (size_t begin, size_t end)
168 {
169 std::shared_ptr<PF::PathFinder> pathfinder =
170 std::make_shared <PF::PathFinder> (nverts,
171 *run_sp::getHeapImpl (heap_type), g);
172
173 std::vector <double> cent (nedges, 0.0);
174
175 for (size_t v = begin; v < end; v++)
176 {
177 if (RcppThread::isInterrupted (v % static_cast<int>(100) == 0))
178 return;
179 const double vwt = vert_wts [v];
180 pathfinder->Centrality_edge (cent, v, vwt, nedges, dist_threshold);
181 }
182 for (size_t i = 0; i < nedges; i++)
183 output [i] += cent [i];
184 }
185
186 void join (const OneCentralityEdge &rhs)
187 {
188 for (size_t i = 0; i < output.size (); i++)
189 output [i] += rhs.output [i];
190 }
191};
192
193void PF::PathFinder::Centrality_vertex (
194 std::vector <double>& cent,
195 const size_t s,
196 const double vert_wt,
197 const double dist_threshold)
198{
199 const DGraphEdge *edge;
200
201 const size_t n = m_graph->nVertices();
202 const std::vector <DGraphVertex>& vertices = m_graph->vertices();
203
204 std::deque <size_t> v_stack;
205
206 std::vector <double> w (n, 0.0);
207 w [s] = 1.0;
208
209 m_heap->insert (s, -1.0);
210
211 // sigma as long int because graphs can be bigger than INT_MAX
212 std::vector <long int> sigma (n, 0);
213 sigma [s] = 1L;
214
215 std::vector <std::vector <size_t> > prev_vert (n);
216
217 while (m_heap->nItems() > 0) {
218 size_t v = m_heap->deleteMin();
219
220 if (w [v] > dist_threshold)
221 continue;
222
223 v_stack.push_back (v);
224
225 edge = vertices [v].outHead;
226
227 // NOTE: This 'target_set', and the one below in 'centrality_edge', are
228 // only needed for graphs which are submitted with (potentially)
229 // duplicate edges. The set is used to ensure centrality values are only
230 // aggregated once along any set of duplicated edges. The effect of not
231 // doing this is documented at
232 // https://github.com/ATFutures/dodgr/issues/186.
233 //
234 // Nevertheless, the other commits flagged in that issue add a function
235 // to the internal 'inst_graph' function at the top of this file ensures
236 // no duplicated edges are added, so this 'target_set' is not actually
237 // needed. The issue shows that it does not decrease computational
238 // efficiency here, so it's left for the moment, but can easily be
239 // removed later.
240 std::unordered_set <size_t> target_set;
241
242 while (edge) {
243
244 size_t et = edge->target;
245 double wt = w [v] + edge->wt;
246
247 if (target_set.find (et) == target_set.end ())
248 {
249 target_set.emplace (et);
250
251 if (w [et] == 0.0) // first connection to et
252 {
253 prev_vert [et] = std::vector <size_t> (1L, v);
254
255 sigma [et] = sigma [v];
256 w [et] = wt;
257 m_heap->insert (et, wt);
258 } else if (wt < w [et])
259 {
260 prev_vert [et] = std::vector <size_t> (1L, v);
261
262 sigma [et] = sigma [v];
263 w [et] = wt;
264 m_heap->decreaseKey (et, wt);
265 } else if (fabs (wt - w [et]) < epsilon)
266 {
267 std::vector <size_t> vert_vec = prev_vert [et];
268 vert_vec.push_back (v);
269 prev_vert [et] = vert_vec;
270
271 sigma [et] += sigma [v];
272 }
273 }
274
275 edge = edge->nextOut;
276 }
277 } // end while nItems > 0
278
279 // Then read from the stack and count centrality paths
280 std::vector <double> delta (n, 0.0);
281 while (!v_stack.empty ())
282 {
283 const size_t v = v_stack.back ();
284 v_stack.pop_back ();
285 std::vector <size_t> vert_vec = prev_vert [v];
286 double tempd = (1.0 + delta [v]) / static_cast <double> (sigma [v]);
287 for (auto ws: vert_vec)
288 {
289 delta [ws] += static_cast <double> (sigma [ws]) * tempd;
290 }
291 if (v != s)
292 cent [v] += vert_wt * delta [v];
293 }
294}
295
296
297void PF::PathFinder::Centrality_edge (
298 std::vector <double>& cent,
299 const size_t s,
300 const double vert_wt,
301 const size_t nedges,
302 const double dist_threshold)
303{
304 const DGraphEdge *edge;
305
306 const size_t n = m_graph->nVertices();
307 const std::vector <DGraphVertex>& vertices = m_graph->vertices();
308
309 std::deque <size_t> v_stack;
310
311 std::vector <double> w (n, 0.0);
312 w [s] = 1.0;
313
314 m_heap->insert (s, -1.0);
315
316 std::vector <long int> sigma (n, 0);
317 sigma [s] = 1L;
318 std::vector <long int> sigma_edge (nedges, 0);
319
320 std::vector <std::vector <size_t> > prev_vert (n), prev_edge (n);
321
322 while (m_heap->nItems() > 0) {
323
324 size_t v = m_heap->deleteMin();
325
326 if (w [v] > dist_threshold)
327 continue;
328
329 v_stack.push_back (v);
330
331 edge = vertices [v].outHead;
332
333 // See comment in 'centrality_vertex', above, about this 'target_set'.
334 std::unordered_set <size_t> target_set;
335
336 while (edge) {
337
338 size_t et = edge->target;
339 double wt = w [v] + edge->wt;
340
341 if (target_set.find (et) == target_set.end ())
342 {
343 target_set.emplace (et);
344
345 // DGraph has no edge iterator, so prev_edge elements contains
346 // pairwise elements of [from vertex, edge_id]
347
348 if (w [et] == 0.0) // first connection to et
349 {
350 prev_edge [et] = std::vector <size_t> {v, edge->edge_id};
351
352 sigma [et] = sigma [v];
353 sigma_edge [edge->edge_id] = sigma [v];
354
355 w [et] = wt;
356 m_heap->insert (et, wt);
357 } else if (wt < w [et])
358 {
359 prev_edge [et] = std::vector <size_t> {v, edge->edge_id};
360
361 sigma [et] = sigma [v];
362 sigma_edge [edge->edge_id] = sigma [v];
363
364 w [et] = wt;
365 m_heap->decreaseKey (et, wt);
366 } else if (fabs (wt - w [et]) < epsilon)
367 {
368 std::vector <size_t> edge_vec = prev_edge [et];
369 edge_vec.push_back (v);
370 edge_vec.push_back (edge->edge_id);
371 prev_edge [et] = edge_vec;
372
373 sigma [et] += sigma [v];
374 sigma_edge [edge->edge_id] += sigma [v];
375 }
376 }
377
378 edge = edge->nextOut;
379 }
380 } // end while nItems > 0
381
382 // Then read from the stack and count centrality paths
383 std::vector <double> delta (n, 0.0);
384 while (!v_stack.empty ())
385 {
386 const size_t v = v_stack.back ();
387 v_stack.pop_back ();
388 std::vector <size_t> edge_vec = prev_edge [v];
389 double tempd = (1.0 + delta [v]) / static_cast <double> (sigma [v]);
390
391 std::vector <size_t>::iterator it = edge_vec.begin ();
392 // The dereferenced edge_vec iterator is simply a direct index
393 while (it != edge_vec.end ())
394 {
395 delta [*it] += static_cast <double> (sigma [*it]) * tempd;
396 it = std::next (it);
397 cent [*it] += static_cast <double> (sigma_edge [*it]) * tempd * vert_wt;
398 it = std::next (it);
399 }
400 }
401}
402
403
404//' rcpp_centrality - parallel function
405//'
406//' sample is used to estimate timing, by calculating centrality from just a few
407//' vertices.
408//' @noRd
409// [[Rcpp::export]]
410Rcpp::NumericVector rcpp_centrality (const Rcpp::DataFrame graph,
411 const Rcpp::DataFrame vert_map_in,
412 const std::string& heap_type,
413 const double dist_threshold,
414 const bool edge_centrality,
415 const int sample)
416{
417 std::vector <std::string> from = graph ["from"];
418 std::vector <std::string> to = graph ["to"];
419 std::vector <double> dist = graph ["d"];
420 std::vector <double> wt = graph ["d_weighted"];
421
422 const size_t nedges = static_cast <size_t> (graph.nrow ());
423 std::map <std::string, size_t> vert_map;
424 std::vector <std::string> vert_map_id = vert_map_in ["vert"];
425 std::vector <size_t> vert_map_n = vert_map_in ["id"];
426 size_t nverts = run_sp::make_vert_map (vert_map_in, vert_map_id,
427 vert_map_n, vert_map);
428
429 Rcpp::CharacterVector v_nms = vert_map_in.attr ("names");
430 std::vector <double> vert_wts (static_cast <size_t> (vert_map_in.nrow ()), 1.0);
431 for (auto n: v_nms) {
432 if (n == "vert_wts") {
433 std::vector <double> tempd = vert_map_in ["vert_wts"];
434 std::copy (tempd.begin (), tempd.end (), vert_wts.begin ());
435 break;
436 }
437 }
438
439 std::shared_ptr <DGraph> g = std::make_shared <DGraph> (nverts);
440 inst_graph (g, nedges, vert_map, from, to, dist, wt);
441
442 size_t nverts_to_use = nverts;
443 if (sample > 0)
444 nverts_to_use = static_cast <size_t> (sample);
445
446 std::vector <double> result;
447 if (edge_centrality)
448 {
449 OneCentralityEdge one_centrality (nverts, nedges, heap_type,
450 vert_wts, dist_threshold, g);
451
452 RcppParallel::parallelReduce (0, nverts_to_use, one_centrality);
453 result = one_centrality.output;
454 } else // vertex centrality
455 {
456 OneCentralityVert one_centrality (nverts, heap_type, vert_wts,
457 dist_threshold, g);
458
459 RcppParallel::parallelReduce (0, nverts_to_use, one_centrality);
460 result = one_centrality.output;
461 }
462
463 return Rcpp::wrap (result);
464}