Distances on Directed Graphs in R
1
2#include "run_sp.h"
3#include "flows.h"
4
5#include "dgraph.h"
6#include "heaps/heap_lib.h"
7
8template <typename T>
9void inst_graph (std::shared_ptr<DGraph> g, size_t nedges,
10 const std::map <std::string, size_t>& vert_map,
11 const std::vector <std::string>& from,
12 const std::vector <std::string>& to,
13 const std::vector <T>& dist,
14 const std::vector <T>& wt)
15{
16 for (size_t i = 0; i < nedges; ++i)
17 {
18 size_t fromi = vert_map.at(from [i]);
19 size_t toi = vert_map.at(to [i]);
20 g->addNewEdge (fromi, toi, dist [i], wt [i], i);
21 }
22}
23
24struct OneAggregate : public RcppParallel::Worker
25{
26 RcppParallel::RVector <int> dp_fromi;
27 const std::vector <size_t> toi;
28 const RcppParallel::RMatrix <double> flows;
29 const std::vector <std::string> vert_name;
30 const std::unordered_map <std::string, size_t> verts_to_edge_map;
31 size_t nverts; // can't be const because of reinterpret cast
32 size_t nedges;
33 const bool norm_sums;
34 const double tol;
35 const std::string heap_type;
36 std::shared_ptr <DGraph> g;
37
38 std::vector <double> output;
39
40 // Constructor 1: The main constructor
41 OneAggregate (
42 const RcppParallel::RVector <int> fromi,
43 const std::vector <size_t> toi_in,
44 const RcppParallel::RMatrix <double> flows_in,
45 const std::vector <std::string> vert_name_in,
46 const std::unordered_map <std::string, size_t> verts_to_edge_map_in,
47 const size_t nverts_in,
48 const size_t nedges_in,
49 const bool norm_sums_in,
50 const double tol_in,
51 const std::string &heap_type_in,
52 const std::shared_ptr <DGraph> g_in) :
53 dp_fromi (fromi), toi (toi_in), flows (flows_in),
54 vert_name (vert_name_in),
55 verts_to_edge_map (verts_to_edge_map_in),
56 nverts (nverts_in), nedges (nedges_in), norm_sums (norm_sums_in),
57 tol (tol_in), heap_type (heap_type_in), g (g_in), output ()
58 {
59 output.resize (nedges, 0.0);
60 }
61
62 // Constructor 2: The Split constructor
63 OneAggregate (
64 const OneAggregate& oneAggregate,
65 RcppParallel::Split) :
66 dp_fromi (oneAggregate.dp_fromi),
67 toi (oneAggregate.toi),
68 flows (oneAggregate.flows),
69 vert_name (oneAggregate.vert_name),
70 verts_to_edge_map (oneAggregate.verts_to_edge_map),
71 nverts (oneAggregate.nverts),
72 nedges (oneAggregate.nedges),
73 norm_sums (oneAggregate.norm_sums),
74 tol (oneAggregate.tol),
75 heap_type (oneAggregate.heap_type),
76 g (oneAggregate.g), output ()
77 {
78 output.resize (nedges, 0.0);
79 }
80
81 // Parallel function operator
82 void operator() (size_t begin, size_t end)
83 {
84 std::shared_ptr<PF::PathFinder> pathfinder =
85 std::make_shared <PF::PathFinder> (nverts,
86 *run_sp::getHeapImpl (heap_type), g);
87 std::vector <double> w (nverts);
88 std::vector <double> d (nverts);
89 std::vector <long int> prev (nverts);
90
91 for (size_t i = begin; i < end; i++)
92 {
93 //if (RcppThread::isInterrupted (i % static_cast<int>(100) == 0))
94 //if (RcppThread::isInterrupted ())
95 // return;
96
97 // These have to be reserved within the parallel operator function!
98 std::fill (w.begin (), w.end (), INFINITE_DOUBLE);
99 std::fill (d.begin (), d.end (), INFINITE_DOUBLE);
100 std::fill (prev.begin (), prev.end (), INFINITE_INT);
101
102 size_t from_i = static_cast <size_t> (dp_fromi [i]);
103 d [from_i] = w [from_i] = 0.0;
104
105 // reduce toi to only those within tolerance limt
106 double flim = 0.0;
107 size_t nto = toi.size ();
108 if (tol > 0.0) {
109 double fmax = 0.0;
110 for (size_t j = 0; j < static_cast <size_t> (flows.ncol ()); j++)
111 if (flows (i, j) > fmax)
112 fmax = flows (i, j);
113 flim = fmax * tol;
114 size_t nto = 0;
115 for (size_t j = 0; j < static_cast <size_t> (flows.ncol ()); j++)
116 if (flows (i, j) > flim)
117 nto++;
118
119 if (nto == 0)
120 continue;
121 }
122
123 // toi_index is into cols of flow matrix
124 // toi_reduced is into the vertex vectors (d, w, prev)
125 std::vector <size_t> toi_reduced, toi_index;
126 toi_reduced.reserve (nto);
127 toi_index.reserve (nto);
128 for (size_t j = 0; j < static_cast <size_t> (flows.ncol ()); j++)
129 if (flows (i, j) > flim)
130 {
131 toi_index.push_back (static_cast <size_t> (j));
132 toi_reduced.push_back (toi [j]);
133 }
134
135 pathfinder->Dijkstra (d, w, prev, from_i, toi_reduced);
136 for (size_t j = 0; j < toi_reduced.size (); j++)
137 {
138 if (from_i != toi_reduced [j]) // Exclude self-flows
139 {
140 double flow_ij = flows (i, toi_index [j]);
141 if (w [toi_reduced [j]] < INFINITE_DOUBLE && flow_ij > 0.0)
142 {
143 // count how long the path is, so flows on
144 // each edge can be divided by this length
145 int path_len = 1;
146 if (norm_sums)
147 {
148 path_len = 0;
149 long int target_t = static_cast <long int> (toi_reduced [j]);
150 size_t from_t = static_cast <size_t> (dp_fromi [i]);
151 while (target_t < INFINITE_INT)
152 {
153 path_len++;
154 size_t target_size_t = static_cast <size_t> (target_t);
155 target_t = prev [target_size_t];
156 if (target_t < 0 || target_size_t == from_t)
157 break;
158 }
159 }
160
161 long int target = static_cast <int> (toi_reduced [j]); // can equal -1
162 while (target < INFINITE_INT)
163 {
164 size_t stt = static_cast <size_t> (target);
165 if (prev [stt] >= 0 && prev [stt] < INFINITE_INT)
166 {
167 std::string v2 = "f" +
168 vert_name [static_cast <size_t> (prev [stt])] +
169 "t" + vert_name [stt];
170 output [verts_to_edge_map.at (v2)] +=
171 flow_ij / static_cast <double> (path_len);
172 }
173
174 target = prev [stt];
175 // Only allocate that flow from origin vertex v to all
176 // previous vertices up until the target vi
177 if (target < 0L || target == dp_fromi [i])
178 {
179 break;
180 }
181 }
182 }
183 }
184 }
185 } // end for i
186 } // end parallel function operator
187
188 void join (const OneAggregate &rhs)
189 {
190 for (size_t i = 0; i < output.size (); i++)
191 output [i] += rhs.output [i];
192 }
193};
194
195
196struct OneAggregatePaired : public RcppParallel::Worker
197{
198 RcppParallel::RVector <int> dp_fromtoi;
199 const RcppParallel::RVector <double> flows;
200 const std::vector <std::string> vert_name;
201 const std::unordered_map <std::string, size_t> verts_to_edge_map;
202 size_t nfrom;
203 size_t nverts; // can't be const because of reinterpret cast
204 size_t nedges;
205 const bool norm_sums;
206 const double tol;
207 const std::string heap_type;
208 std::shared_ptr <DGraph> g;
209
210 std::vector <double> output;
211
212 // Constructor 1: The main constructor
213 OneAggregatePaired (
214 const RcppParallel::RVector <int> fromtoi,
215 const RcppParallel::RVector <double> flows_in,
216 const std::vector <std::string> vert_name_in,
217 const std::unordered_map <std::string, size_t> verts_to_edge_map_in,
218 const size_t nfrom_in,
219 const size_t nverts_in,
220 const size_t nedges_in,
221 const bool norm_sums_in,
222 const double tol_in,
223 const std::string &heap_type_in,
224 const std::shared_ptr <DGraph> g_in
225 ) :
226 dp_fromtoi (fromtoi), flows (flows_in),
227 vert_name (vert_name_in),
228 verts_to_edge_map (verts_to_edge_map_in),
229 nfrom (nfrom_in), nverts (nverts_in), nedges (nedges_in),
230 norm_sums (norm_sums_in), tol (tol_in), heap_type (heap_type_in),
231 g (g_in), output ()
232 {
233 output.resize (nedges, 0.0);
234 }
235
236 // Constructor 2: The Split constructor
237 OneAggregatePaired (
238 const OneAggregatePaired& oneAggregatePaired,
239 RcppParallel::Split) :
240 dp_fromtoi (oneAggregatePaired.dp_fromtoi),
241 flows (oneAggregatePaired.flows),
242 vert_name (oneAggregatePaired.vert_name),
243 verts_to_edge_map (oneAggregatePaired.verts_to_edge_map),
244 nfrom (oneAggregatePaired.nfrom),
245 nverts (oneAggregatePaired.nverts),
246 nedges (oneAggregatePaired.nedges),
247 norm_sums (oneAggregatePaired.norm_sums),
248 tol (oneAggregatePaired.tol),
249 heap_type (oneAggregatePaired.heap_type),
250 g (oneAggregatePaired.g), output ()
251 {
252 output.resize (nedges, 0.0);
253 }
254
255 // Parallel function operator
256 void operator() (size_t begin, size_t end)
257 {
258 std::shared_ptr<PF::PathFinder> pathfinder =
259 std::make_shared <PF::PathFinder> (nverts,
260 *run_sp::getHeapImpl (heap_type), g);
261 std::vector <double> w (nverts);
262 std::vector <double> d (nverts);
263 std::vector <long int> prev (nverts);
264
265 for (size_t i = begin; i < end; i++)
266 {
267 //if (RcppThread::isInterrupted (i % static_cast<int>(100) == 0))
268 //if (RcppThread::isInterrupted ())
269 // return;
270
271 const size_t from_i = static_cast <size_t> (dp_fromtoi [i]);
272 // to_i has to be a vector for Dijkstra algo, but has only 1
273 // element.
274 const std::vector <size_t> to_i = {static_cast <size_t> (dp_fromtoi [nfrom + i])};
275
276 // These have to be reserved within the parallel operator function!
277 std::fill (w.begin (), w.end (), INFINITE_DOUBLE);
278 std::fill (d.begin (), d.end (), INFINITE_DOUBLE);
279 std::fill (prev.begin (), prev.end (), INFINITE_INT);
280
281 d [from_i] = w [from_i] = 0.0;
282
283 pathfinder->Dijkstra (d, w, prev, from_i, to_i);
284 // loop has always only one element here:
285 for (size_t j = 0; j < to_i.size (); j++)
286 {
287 if (w [to_i [j]] < INFINITE_DOUBLE && flows [i] > 0.0)
288 {
289 // count how long the path is, so flows on
290 // each edge can be divided by this length
291 int path_len = 1;
292 if (norm_sums)
293 {
294 path_len = 0;
295 long int target_t = static_cast <long int> (to_i [j]);
296 while (target_t < INFINITE_INT)
297 {
298 path_len++;
299 size_t target_size_t = static_cast <size_t> (target_t);
300 target_t = prev [target_size_t];
301 if (target_t < 0 || target_size_t == from_i)
302 break;
303 }
304 }
305
306 long int target = static_cast <int> (to_i [j]); // can equal -1
307 while (target < INFINITE_INT)
308 {
309 size_t stt = static_cast <size_t> (target);
310 if (prev [stt] >= 0 && prev [stt] < INFINITE_INT)
311 {
312 std::string v2 = "f" +
313 vert_name [static_cast <size_t> (prev [stt])] +
314 "t" + vert_name [stt];
315 output [verts_to_edge_map.at (v2)] +=
316 flows [i] / static_cast <double> (path_len);
317 }
318
319 target = prev [stt];
320 // Only allocate that flow from origin vertex v to all
321 // previous vertices up until the target vi
322 if (target < 0L || target == from_i)
323 {
324 break;
325 }
326 }
327 }
328 }
329 } // end for i
330 } // end parallel function operator
331
332 void join (const OneAggregatePaired &rhs)
333 {
334 for (size_t i = 0; i < output.size (); i++)
335 output [i] += rhs.output [i];
336 }
337};
338
339
340struct OneDisperse : public RcppParallel::Worker
341{
342 RcppParallel::RVector <int> dp_fromi;
343 const RcppParallel::RVector <double> dens;
344 const std::vector <std::string> vert_name;
345 const std::unordered_map <std::string, size_t> verts_to_edge_map;
346 size_t nverts; // can't be const because of reinterpret cast
347 size_t nedges;
348 const RcppParallel::RVector <double> kfrom;
349 const double tol;
350 const std::string heap_type;
351 std::shared_ptr <DGraph> g;
352
353 std::vector <double> output;
354
355 // Constructor 1: The main constructor
356 OneDisperse (
357 const RcppParallel::RVector <int> fromi,
358 const RcppParallel::RVector <double> dens_in,
359 const std::vector <std::string> vert_name_in,
360 const std::unordered_map <std::string, size_t> verts_to_edge_map_in,
361 const size_t nverts_in,
362 const size_t nedges_in,
363 const RcppParallel::RVector <double> kfrom_in,
364 const double tol_in,
365 const std::string &heap_type_in,
366 const std::shared_ptr <DGraph> g_in) :
367 dp_fromi (fromi), dens (dens_in), vert_name (vert_name_in),
368 verts_to_edge_map (verts_to_edge_map_in),
369 nverts (nverts_in), nedges (nedges_in), kfrom (kfrom_in),
370 tol (tol_in), heap_type (heap_type_in), g (g_in), output ()
371 {
372 const R_xlen_t nfrom = static_cast <R_xlen_t> (dens.size ());
373 const R_xlen_t nk = static_cast <R_xlen_t> (kfrom.size ()) / nfrom;
374 const size_t out_size = nedges * static_cast <size_t> (nk);
375 output.resize (out_size, 0.0);
376 }
377
378 // Constructor 2: The Split constructor
379 OneDisperse (
380 const OneDisperse& oneDisperse,
381 RcppParallel::Split) :
382 dp_fromi (oneDisperse.dp_fromi), dens (oneDisperse.dens),
383 vert_name (oneDisperse.vert_name),
384 verts_to_edge_map (oneDisperse.verts_to_edge_map),
385 nverts (oneDisperse.nverts), nedges (oneDisperse.nedges),
386 kfrom (oneDisperse.kfrom), tol (oneDisperse.tol),
387 heap_type (oneDisperse.heap_type), g (oneDisperse.g), output ()
388 {
389 const R_xlen_t nfrom = static_cast <R_xlen_t> (dens.size ());
390 const R_xlen_t nk = static_cast <R_xlen_t> (kfrom.size ()) / nfrom;
391 size_t out_size = nedges * static_cast <size_t> (nk);
392 output.resize (out_size, 0.0);
393 }
394
395
396 // Parallel function operator
397 void operator() (size_t begin, size_t end)
398 {
399 std::shared_ptr<PF::PathFinder> pathfinder =
400 std::make_shared <PF::PathFinder> (nverts,
401 *run_sp::getHeapImpl (heap_type), g);
402 std::vector <double> w (nverts);
403 std::vector <double> d (nverts);
404 std::vector <long int> prev (nverts);
405
406 const size_t nfrom = dens.size ();
407 const size_t nk = kfrom.size () / nfrom;
408
409 for (size_t i = begin; i < end; i++) // over the from vertices
410 {
411 //if (RcppThread::isInterrupted (i % static_cast<int>(10) == 0))
412 if (RcppThread::isInterrupted ())
413 return;
414
415 // translate k-value to distance limit based on tol
416 // exp(-d / k) = tol -> d = -k * log (tol)
417 // k_from holds nk vectors of different k-values, each of length
418 // nedges.
419 double dlim = 0.0;
420 for (size_t k = 0; k < nk; k++)
421 if (kfrom [i + k * nfrom] > dlim)
422 dlim = kfrom [i + k * nfrom]; // dlim is max k-value
423 dlim = -dlim * log (tol); // converted to actual dist limit.
424
425 std::fill (w.begin (), w.end (), INFINITE_DOUBLE);
426 std::fill (d.begin (), d.end (), INFINITE_DOUBLE);
427 std::fill (prev.begin (), prev.end (), INFINITE_INT);
428
429 const size_t from_i = static_cast <size_t> (dp_fromi [i]);
430 d [from_i] = w [from_i] = 0.0;
431
432 pathfinder->DijkstraLimit (d, w, prev, from_i, dlim);
433
434 std::vector <double> flows_i (nedges * nk, 0.0);
435 std::vector <double> expsum (nk, 0.0);
436
437 for (size_t j = 0; j < nverts; j++)
438 {
439 if (prev [j] >= 0 && prev [j] < INFINITE_INT)
440 {
441 const std::string vert_to = vert_name [j],
442 vert_from = vert_name [static_cast <size_t> (prev [j])];
443 const std::string two_verts = "f" + vert_from + "t" + vert_to;
444
445 size_t index = verts_to_edge_map.at (two_verts);
446 if (d [j] < INFINITE_DOUBLE)
447 {
448 for (size_t k = 0; k < nk; k++)
449 {
450 double exp_jk;
451 if (kfrom [i + k * nfrom] > 0.0)
452 exp_jk = exp (-d [j] / kfrom [i + k * nfrom]);
453 else
454 {
455 // standard logistic polygonal for UK cycling
456 // models
457 double lp = -3.894 + (-0.5872 * d [j]) +
458 (1.832 * sqrt (d [j])) +
459 (0.007956 * d [j] * d [j]);
460 exp_jk = exp (lp) / (1.0 + exp (lp));
461 }
462 const size_t k_st = static_cast <size_t> (k);
463 expsum [k_st] += exp_jk;
464 flows_i [index + k_st * nedges] += dens [i] * exp_jk;
465 }
466 }
467 }
468 } // end for j
469 for (size_t k = 0; k < nk; k++)
470 if (expsum [k] > tol)
471 for (size_t j = 0; j < nedges; j++)
472 output [j + k * nedges] +=
473 flows_i [j + k * nedges] / expsum [k];
474 } // end for i
475 } // end parallel function operator
476
477 void join (const OneDisperse &rhs)
478 {
479 for (size_t i = 0; i < output.size (); i++)
480 output [i] += rhs.output [i];
481 }
482};
483
484struct OneSI : public RcppParallel::Worker
485{
486 RcppParallel::RVector <int> dp_fromi;
487 const std::vector <size_t> toi;
488 const RcppParallel::RVector <double> k_from;
489 const RcppParallel::RVector <double> dens_from;
490 const RcppParallel::RVector <double> dens_to;
491 const std::vector <std::string> vert_name;
492 const std::unordered_map <std::string, size_t> verts_to_edge_map;
493 size_t nverts; // can't be const because of reinterpret cast
494 size_t nedges;
495 const bool norm_sums;
496 const double tol;
497 const std::string heap_type;
498 std::shared_ptr <DGraph> g;
499
500 std::vector <double> output;
501
502 // Constructor 1: The main constructor
503 OneSI (
504 const RcppParallel::RVector <int> fromi,
505 const std::vector <size_t> toi_in,
506 const RcppParallel::RVector <double> k_from_in,
507 const RcppParallel::RVector <double> dens_from_in,
508 const RcppParallel::RVector <double> dens_to_in,
509 const std::vector <std::string> vert_name_in,
510 const std::unordered_map <std::string, size_t> verts_to_edge_map_in,
511 const size_t nverts_in,
512 const size_t nedges_in,
513 const bool norm_sums_in,
514 const double tol_in,
515 const std::string &heap_type_in,
516 const std::shared_ptr <DGraph> g_in) :
517 dp_fromi (fromi), toi (toi_in), k_from (k_from_in),
518 dens_from (dens_from_in), dens_to (dens_to_in),
519 vert_name (vert_name_in), verts_to_edge_map (verts_to_edge_map_in),
520 nverts (nverts_in), nedges (nedges_in), norm_sums (norm_sums_in),
521 tol (tol_in), heap_type (heap_type_in), g (g_in), output ()
522 {
523 const size_t nfrom = dens_from.size ();
524 const size_t nk = k_from.size () / nfrom;
525 const size_t out_size = nedges * nk;
526 output.resize (out_size, 0.0);
527 }
528
529 // Constructor 2: The Split constructor
530 OneSI (
531 const OneSI& oneSI,
532 RcppParallel::Split) :
533 dp_fromi (oneSI.dp_fromi), toi (oneSI.toi), k_from (oneSI.k_from),
534 dens_from (oneSI.dens_from), dens_to (oneSI.dens_to),
535 vert_name (oneSI.vert_name), verts_to_edge_map (oneSI.verts_to_edge_map),
536 nverts (oneSI.nverts), nedges (oneSI.nedges),
537 norm_sums (oneSI.norm_sums), tol (oneSI.tol),
538 heap_type (oneSI.heap_type), g (oneSI.g), output ()
539 {
540 const size_t nfrom = dens_from.size ();
541 const size_t nk = k_from.size () / nfrom;
542 const size_t out_size = nedges * nk;
543 output.resize (out_size, 0.0);
544 }
545
546 // Parallel function operator
547 void operator() (size_t begin, size_t end)
548 {
549 std::shared_ptr<PF::PathFinder> pathfinder =
550 std::make_shared <PF::PathFinder> (nverts,
551 *run_sp::getHeapImpl (heap_type), g);
552 std::vector <double> w (nverts);
553 std::vector <double> d (nverts);
554 std::vector <long int> prev (nverts);
555
556 // k_from can have multiple vectors of k-values, each equal in length to
557 // the number of 'from' points. The output is then a single vector of
558 // 'nedges' wrapped 'nk' times.
559 const size_t nfrom = dens_from.size ();
560 const size_t nk = k_from.size () / nfrom;
561
562 for (size_t i = begin; i < end; i++)
563 {
564 //if (RcppThread::isInterrupted (i % static_cast<int>(100) == 0))
565 if (RcppThread::isInterrupted ())
566 return;
567
568 // These have to be reserved within the parallel operator function!
569 std::fill (w.begin (), w.end (), INFINITE_DOUBLE);
570 std::fill (d.begin (), d.end (), INFINITE_DOUBLE);
571 std::fill (prev.begin (), prev.end (), INFINITE_INT);
572
573 size_t from_i = static_cast <size_t> (dp_fromi [i]);
574 d [from_i] = w [from_i] = 0.0;
575
576 // translate k-value to distance limit based on tol
577 // exp(-d / k) = tol -> d = -k * log (tol)
578 // const double dlim = -k_from [i] * log (tol);
579 // k_from holds nk vectors of different k-values, each of length
580 // nedges.
581 double dlim = 0.0;
582 for (size_t k = 0; k < nk; k++)
583 if (k_from [i + k * nfrom] > dlim)
584 dlim = k_from [i + k * nfrom];
585 dlim = -dlim * log (tol);
586
587 pathfinder->DijkstraLimit (d, w, prev, from_i, dlim);
588
589 std::vector <double> flows_i (nedges * nk, 0.0);
590 std::vector <double> expsum (nk, 0.0);
591 for (size_t j = 0; j < toi.size (); j++)
592 {
593 if (from_i != toi [j]) // Exclude self-flows
594 {
595 if (d [toi [j]] < INFINITE_DOUBLE)
596 {
597 /* Flow between i and j is based on d (i, j), but is
598 * subsequently allocated to all intermediate edges.
599 * The `norm_sums` option re-scales this intermediate
600 * edge allocation such that SI value to one terminal
601 * vertex from the origin vertex of:
602 * N_i N_j exp (-d_{ij}) / sum_j (N_{ij} exp (-d_{ij}))
603 * is divided by the number of intermediate edges
604 * between i and j. This is achieved by initially only
605 * allocating the values of the numerator divided by
606 * numbers of intermediate edges, while accumulating the
607 * denominator independent of that value in order to
608 * divide all final values.
609 */
610 std::vector <double> exp_jk (nk, 0.0),
611 flow_ijk (nk, 0.0);
612 for (size_t k = 0; k < nk; k++)
613 {
614 exp_jk [k] = dens_to [j] *
615 exp (-d [toi [j]] / k_from [i + k * nfrom]);
616 flow_ijk [k] = dens_from [i] * exp_jk [k];
617 expsum [k] += exp_jk [k];
618 }
619
620 // need to count how long the path is, so flows on
621 // each edge can be divided by this length
622 int path_len = 1;
623 if (norm_sums)
624 {
625 path_len = 0;
626 long int target_t = static_cast <long int> (toi [j]);
627 size_t from_t = static_cast <size_t> (dp_fromi [i]);
628 while (target_t < INFINITE_INT)
629 {
630 size_t target_size_t = static_cast <size_t> (target_t);
631 path_len++;
632 target_t = prev [target_size_t];
633 if (target_t < 0 || target_size_t == from_t)
634 break;
635 }
636 }
637
638 long int target = static_cast <long int> (toi [j]); // can equal -1
639 while (target < INFINITE_INT)
640 {
641 size_t stt = static_cast <size_t> (target);
642 if (prev [stt] >= 0 && prev [stt] < INFINITE_INT)
643 {
644 std::string v2 = "f" +
645 vert_name [static_cast <size_t> (prev [stt])] +
646 "t" + vert_name [stt];
647 // multiple flows can aggregate to same edge, so
648 // this has to be +=, not just =!
649 size_t index = verts_to_edge_map.at (v2);
650 for (size_t k = 0; k < nk; k++)
651 {
652 flows_i [index + k * nedges] +=
653 flow_ijk [k] /
654 static_cast <double> (path_len);
655 }
656 }
657
658 target = static_cast <long int> (prev [stt]);
659 // Only allocate that flow from origin vertex v to all
660 // previous vertices up until the target vi
661 if (target < 0L || target == dp_fromi [i])
662 {
663 break;
664 }
665 }
666 }
667 }
668 } // end for j
669 for (size_t k = 0; k < nk; k++)
670 if (expsum [k] > tol)
671 for (size_t j = 0; j < nedges; j++)
672 output [j + k * nedges] +=
673 flows_i [j + k * nedges] / expsum [k];
674 } // end for i
675 } // end parallel function operator
676
677 void join (const OneSI& rhs)
678 {
679 for (size_t i = 0; i < output.size (); i++)
680 output [i] += rhs.output [i];
681 }
682};
683
684//' rcpp_flows_aggregate_par
685//'
686//' @param graph The data.frame holding the graph edges
687//' @param vert_map_in map from <std::string> vertex ID to (0-indexed) integer
688//' index of vertices
689//' @param fromi Index into vert_map_in of vertex numbers
690//' @param toi Index into vert_map_in of vertex numbers
691//' @param tol Relative tolerance in terms of flows below which targets
692//' (to-vertices) are not considered.
693//'
694//' @note The parallelisation is achieved by dumping the results of each thread
695//' to a file, with aggregation performed at the end by simply reading back and
696//' aggregating all files. There is no way to aggregate into a single vector
697//' because threads have to be independent. The only danger with this approach
698//' is that multiple threads may generate the same file names, but with names 10
699//' characters long, that chance should be 1 / 62 ^ 10.
700//'
701//' @noRd
702// [[Rcpp::export]]
703Rcpp::NumericVector rcpp_flows_aggregate_par (const Rcpp::DataFrame graph,
704 const Rcpp::DataFrame vert_map_in,
705 Rcpp::IntegerVector fromi,
706 Rcpp::IntegerVector toi_in,
707 Rcpp::NumericMatrix flows,
708 const bool norm_sums,
709 const double tol,
710 const std::string heap_type)
711{
712 std::vector <size_t> toi =
713 Rcpp::as <std::vector <size_t> > (toi_in);
714 const size_t nfrom = static_cast <size_t> (fromi.size ());
715
716 const std::vector <std::string> from = graph ["from"];
717 const std::vector <std::string> to = graph ["to"];
718 const std::vector <double> dist = graph ["d"];
719 const std::vector <double> wt = graph ["d_weighted"];
720
721 const size_t nedges = static_cast <size_t> (graph.nrow ());
722 const std::vector <std::string> vert_name = vert_map_in ["vert"];
723 const std::vector <size_t> vert_indx = vert_map_in ["id"];
724 // Make map from vertex name to integer index
725 std::map <std::string, size_t> vert_map_i;
726 const size_t nverts = run_sp::make_vert_map (vert_map_in, vert_name,
727 vert_indx, vert_map_i);
728
729 std::unordered_map <std::string, size_t> verts_to_edge_map;
730 std::unordered_map <std::string, double> verts_to_dist_map;
731 run_sp::make_vert_to_edge_maps (from, to, wt, verts_to_edge_map, verts_to_dist_map);
732
733 std::shared_ptr <DGraph> g = std::make_shared <DGraph> (nverts);
734 inst_graph (g, nedges, vert_map_i, from, to, dist, wt);
735
736 // Create parallel worker
737 OneAggregate oneAggregate (RcppParallel::RVector <int> (fromi), toi,
738 RcppParallel::RMatrix <double> (flows), vert_name, verts_to_edge_map,
739 nverts, nedges, norm_sums, tol, heap_type, g);
740
741 size_t chunk_size = run_sp::get_chunk_size (nfrom);
742 RcppParallel::parallelReduce (0, nfrom, oneAggregate, chunk_size);
743
744 return Rcpp::wrap (oneAggregate.output);
745}
746
747
748//' rcpp_flows_aggregate_pairwise
749//'
750//' Pairwise version of flows_aggregate_par
751//'
752//' @param graph The data.frame holding the graph edges
753//' @param vert_map_in map from <std::string> vertex ID to (0-indexed) integer
754//' index of vertices
755//' @param fromi Index into vert_map_in of vertex numbers
756//' @param toi Index into vert_map_in of vertex numbers
757//' @param tol Relative tolerance in terms of flows below which targets
758//' (to-vertices) are not considered.
759//'
760//' @note The parallelisation is achieved by dumping the results of each thread
761//' to a file, with aggregation performed at the end by simply reading back and
762//' aggregating all files. There is no way to aggregate into a single vector
763//' because threads have to be independent. The only danger with this approach
764//' is that multiple threads may generate the same file names, but with names 10
765//' characters long, that chance should be 1 / 62 ^ 10.
766//'
767//' @noRd
768// [[Rcpp::export]]
769Rcpp::NumericVector rcpp_flows_aggregate_pairwise (const Rcpp::DataFrame graph,
770 const Rcpp::DataFrame vert_map_in,
771 Rcpp::IntegerVector fromi,
772 Rcpp::IntegerVector toi,
773 Rcpp::NumericVector flows,
774 const bool norm_sums,
775 const double tol,
776 const std::string heap_type)
777{
778 if (fromi.size () != toi.size ())
779 Rcpp::stop ("pairwise dists must have from.size == to.size");
780 long int n = fromi.size ();
781 size_t n_st = static_cast <size_t> (n);
782
783 const std::vector <std::string> from = graph ["from"];
784 const std::vector <std::string> to = graph ["to"];
785 const std::vector <double> dist = graph ["d"];
786 const std::vector <double> wt = graph ["d_weighted"];
787
788 const size_t nedges = static_cast <size_t> (graph.nrow ());
789 const std::vector <std::string> vert_name = vert_map_in ["vert"];
790 const std::vector <size_t> vert_indx = vert_map_in ["id"];
791 // Make map from vertex name to integer index
792 std::map <std::string, size_t> vert_map_i;
793 const size_t nverts = run_sp::make_vert_map (vert_map_in, vert_name,
794 vert_indx, vert_map_i);
795
796 std::unordered_map <std::string, size_t> verts_to_edge_map;
797 std::unordered_map <std::string, double> verts_to_dist_map;
798 run_sp::make_vert_to_edge_maps (from, to, wt, verts_to_edge_map, verts_to_dist_map);
799
800 std::shared_ptr <DGraph> g = std::make_shared <DGraph> (nverts);
801 inst_graph (g, nedges, vert_map_i, from, to, dist, wt);
802
803 // Paired (fromi, toi) in a single vector
804 Rcpp::IntegerVector fromto (2 * n_st);
805 for (int i = 0; i < n; i++)
806 {
807 size_t i_t = static_cast <size_t> (i);
808 fromto [i] = fromi (i_t);
809 fromto [i + n] = toi (i_t);
810 }
811
812 // Create parallel worker
813 OneAggregatePaired oneAggregatePaired (RcppParallel::RVector <int> (fromto),
814 RcppParallel::RVector <double> (flows), vert_name, verts_to_edge_map,
815 n_st, nverts, nedges, norm_sums, tol, heap_type, g);
816
817 size_t chunk_size = run_sp::get_chunk_size (n_st);
818 RcppParallel::parallelReduce (0, n_st, oneAggregatePaired, chunk_size);
819
820 return Rcpp::wrap (oneAggregatePaired.output);
821}
822
823
824
825//' rcpp_flows_disperse_par
826//'
827//' Modified version of \code{rcpp_flows_aggregate} that aggregates flows to all
828//' destinations from given set of origins, with flows attenuated by distance
829//' from those origins.
830//'
831//' @param graph The data.frame holding the graph edges
832//' @param vert_map_in map from <std::string> vertex ID to (0-indexed) integer
833//' index of vertices
834//' @param fromi Index into vert_map_in of vertex numbers
835//' @param k Coefficient of (current proof-of-principle-only) exponential
836//' distance decay function. If value of \code{k<0} is given, a standard
837//' logistic polynomial will be used.
838//'
839//' @note The flow data to be used for aggregation is a matrix mapping flows
840//' between each pair of from and to points.
841//'
842//' @noRd
843// [[Rcpp::export]]
844Rcpp::NumericVector rcpp_flows_disperse_par (const Rcpp::DataFrame graph,
845 const Rcpp::DataFrame vert_map_in,
846 Rcpp::IntegerVector fromi,
847 Rcpp::NumericVector k,
848 Rcpp::NumericVector dens,
849 const double &tol,
850 std::string heap_type)
851{
852 const size_t nfrom = static_cast <size_t> (fromi.size ());
853
854 std::vector <std::string> from = graph ["from"];
855 std::vector <std::string> to = graph ["to"];
856 std::vector <double> dist = graph ["d"];
857 std::vector <double> wt = graph ["d_weighted"];
858
859 size_t nedges = static_cast <size_t> (graph.nrow ());
860 std::vector <std::string> vert_name = vert_map_in ["vert"];
861 std::vector <size_t> vert_indx = vert_map_in ["id"];
862 // Make map from vertex name to integer index
863 std::map <std::string, size_t> vert_map_i;
864 size_t nverts = run_sp::make_vert_map (vert_map_in, vert_name,
865 vert_indx, vert_map_i);
866
867 std::unordered_map <std::string, size_t> verts_to_edge_map;
868 std::unordered_map <std::string, double> verts_to_dist_map;
869 run_sp::make_vert_to_edge_maps (from, to, wt, verts_to_edge_map, verts_to_dist_map);
870
871 std::shared_ptr<DGraph> g = std::make_shared<DGraph> (nverts);
872 inst_graph (g, nedges, vert_map_i, from, to, dist, wt);
873
874 // Create parallel worker
875 OneDisperse oneDisperse (RcppParallel::RVector <int> (fromi),
876 RcppParallel::RVector <double> (dens),
877 vert_name, verts_to_edge_map,
878 nverts, nedges,
879 RcppParallel::RVector <double> (k), tol, heap_type, g);
880
881 size_t chunk_size = run_sp::get_chunk_size (nfrom);
882 RcppParallel::parallelReduce (0, nfrom, oneDisperse, chunk_size);
883
884 return Rcpp::wrap (oneDisperse.output);
885}
886
887
888//' rcpp_flows_si
889//'
890//' @param graph The data.frame holding the graph edges
891//' @param vert_map_in map from <std::string> vertex ID to (0-indexed) integer
892//' index of vertices
893//' @param fromi Index into vert_map_in of vertex numbers
894//' @param toi Index into vert_map_in of vertex numbers
895//' @param kvec Vector of k-values for each fromi
896//' @param nvec Vector of density-values for each fromi
897//' @param tol Relative tolerance in terms of flows below which targets
898//' (to-vertices) are not considered.
899//'
900//' @noRd
901// [[Rcpp::export]]
902Rcpp::NumericVector rcpp_flows_si (const Rcpp::DataFrame graph,
903 const Rcpp::DataFrame vert_map_in,
904 Rcpp::IntegerVector fromi,
905 Rcpp::IntegerVector toi_in,
906 Rcpp::NumericVector kvec,
907 Rcpp::NumericVector dens_from,
908 Rcpp::NumericVector dens_to,
909 const bool norm_sums,
910 const double tol,
911 const std::string heap_type)
912{
913 std::vector <size_t> toi =
914 Rcpp::as <std::vector <size_t> > ( toi_in);
915 const size_t nfrom = static_cast <size_t> (fromi.size ());
916
917 const std::vector <std::string> from = graph ["from"];
918 const std::vector <std::string> to = graph ["to"];
919 const std::vector <double> dist = graph ["d"];
920 const std::vector <double> wt = graph ["d_weighted"];
921
922 const size_t nedges = static_cast <size_t> (graph.nrow ());
923 const std::vector <std::string> vert_name = vert_map_in ["vert"];
924 const std::vector <size_t> vert_indx = vert_map_in ["id"];
925 // Make map from vertex name to integer index
926 std::map <std::string, size_t> vert_map_i;
927 const size_t nverts = run_sp::make_vert_map (vert_map_in, vert_name,
928 vert_indx, vert_map_i);
929
930 std::unordered_map <std::string, size_t> verts_to_edge_map;
931 std::unordered_map <std::string, double> verts_to_dist_map;
932 run_sp::make_vert_to_edge_maps (from, to, wt, verts_to_edge_map, verts_to_dist_map);
933
934 std::shared_ptr <DGraph> g = std::make_shared <DGraph> (nverts);
935 inst_graph (g, nedges, vert_map_i, from, to, dist, wt);
936
937 // Create parallel worker
938 OneSI oneSI (RcppParallel::RVector <int> (fromi), toi,
939 RcppParallel::RVector <double> (kvec),
940 RcppParallel::RVector <double> (dens_from),
941 RcppParallel::RVector <double> (dens_to),
942 vert_name, verts_to_edge_map,
943 nverts, nedges, norm_sums, tol, heap_type, g);
944
945 size_t chunk_size = run_sp::get_chunk_size (nfrom);
946 RcppParallel::parallelReduce (0, nfrom, oneSI, chunk_size);
947
948 return Rcpp::wrap (oneSI.output);
949}