Distances on Directed Graphs in R
at main 949 lines 40 kB view raw
1 2#include "run_sp.h" 3#include "flows.h" 4 5#include "dgraph.h" 6#include "heaps/heap_lib.h" 7 8template <typename T> 9void inst_graph (std::shared_ptr<DGraph> g, size_t nedges, 10 const std::map <std::string, size_t>& vert_map, 11 const std::vector <std::string>& from, 12 const std::vector <std::string>& to, 13 const std::vector <T>& dist, 14 const std::vector <T>& wt) 15{ 16 for (size_t i = 0; i < nedges; ++i) 17 { 18 size_t fromi = vert_map.at(from [i]); 19 size_t toi = vert_map.at(to [i]); 20 g->addNewEdge (fromi, toi, dist [i], wt [i], i); 21 } 22} 23 24struct OneAggregate : public RcppParallel::Worker 25{ 26 RcppParallel::RVector <int> dp_fromi; 27 const std::vector <size_t> toi; 28 const RcppParallel::RMatrix <double> flows; 29 const std::vector <std::string> vert_name; 30 const std::unordered_map <std::string, size_t> verts_to_edge_map; 31 size_t nverts; // can't be const because of reinterpret cast 32 size_t nedges; 33 const bool norm_sums; 34 const double tol; 35 const std::string heap_type; 36 std::shared_ptr <DGraph> g; 37 38 std::vector <double> output; 39 40 // Constructor 1: The main constructor 41 OneAggregate ( 42 const RcppParallel::RVector <int> fromi, 43 const std::vector <size_t> toi_in, 44 const RcppParallel::RMatrix <double> flows_in, 45 const std::vector <std::string> vert_name_in, 46 const std::unordered_map <std::string, size_t> verts_to_edge_map_in, 47 const size_t nverts_in, 48 const size_t nedges_in, 49 const bool norm_sums_in, 50 const double tol_in, 51 const std::string &heap_type_in, 52 const std::shared_ptr <DGraph> g_in) : 53 dp_fromi (fromi), toi (toi_in), flows (flows_in), 54 vert_name (vert_name_in), 55 verts_to_edge_map (verts_to_edge_map_in), 56 nverts (nverts_in), nedges (nedges_in), norm_sums (norm_sums_in), 57 tol (tol_in), heap_type (heap_type_in), g (g_in), output () 58 { 59 output.resize (nedges, 0.0); 60 } 61 62 // Constructor 2: The Split constructor 63 OneAggregate ( 64 const OneAggregate& oneAggregate, 65 RcppParallel::Split) : 66 dp_fromi (oneAggregate.dp_fromi), 67 toi (oneAggregate.toi), 68 flows (oneAggregate.flows), 69 vert_name (oneAggregate.vert_name), 70 verts_to_edge_map (oneAggregate.verts_to_edge_map), 71 nverts (oneAggregate.nverts), 72 nedges (oneAggregate.nedges), 73 norm_sums (oneAggregate.norm_sums), 74 tol (oneAggregate.tol), 75 heap_type (oneAggregate.heap_type), 76 g (oneAggregate.g), output () 77 { 78 output.resize (nedges, 0.0); 79 } 80 81 // Parallel function operator 82 void operator() (size_t begin, size_t end) 83 { 84 std::shared_ptr<PF::PathFinder> pathfinder = 85 std::make_shared <PF::PathFinder> (nverts, 86 *run_sp::getHeapImpl (heap_type), g); 87 std::vector <double> w (nverts); 88 std::vector <double> d (nverts); 89 std::vector <long int> prev (nverts); 90 91 for (size_t i = begin; i < end; i++) 92 { 93 //if (RcppThread::isInterrupted (i % static_cast<int>(100) == 0)) 94 //if (RcppThread::isInterrupted ()) 95 // return; 96 97 // These have to be reserved within the parallel operator function! 98 std::fill (w.begin (), w.end (), INFINITE_DOUBLE); 99 std::fill (d.begin (), d.end (), INFINITE_DOUBLE); 100 std::fill (prev.begin (), prev.end (), INFINITE_INT); 101 102 size_t from_i = static_cast <size_t> (dp_fromi [i]); 103 d [from_i] = w [from_i] = 0.0; 104 105 // reduce toi to only those within tolerance limt 106 double flim = 0.0; 107 size_t nto = toi.size (); 108 if (tol > 0.0) { 109 double fmax = 0.0; 110 for (size_t j = 0; j < static_cast <size_t> (flows.ncol ()); j++) 111 if (flows (i, j) > fmax) 112 fmax = flows (i, j); 113 flim = fmax * tol; 114 size_t nto = 0; 115 for (size_t j = 0; j < static_cast <size_t> (flows.ncol ()); j++) 116 if (flows (i, j) > flim) 117 nto++; 118 119 if (nto == 0) 120 continue; 121 } 122 123 // toi_index is into cols of flow matrix 124 // toi_reduced is into the vertex vectors (d, w, prev) 125 std::vector <size_t> toi_reduced, toi_index; 126 toi_reduced.reserve (nto); 127 toi_index.reserve (nto); 128 for (size_t j = 0; j < static_cast <size_t> (flows.ncol ()); j++) 129 if (flows (i, j) > flim) 130 { 131 toi_index.push_back (static_cast <size_t> (j)); 132 toi_reduced.push_back (toi [j]); 133 } 134 135 pathfinder->Dijkstra (d, w, prev, from_i, toi_reduced); 136 for (size_t j = 0; j < toi_reduced.size (); j++) 137 { 138 if (from_i != toi_reduced [j]) // Exclude self-flows 139 { 140 double flow_ij = flows (i, toi_index [j]); 141 if (w [toi_reduced [j]] < INFINITE_DOUBLE && flow_ij > 0.0) 142 { 143 // count how long the path is, so flows on 144 // each edge can be divided by this length 145 int path_len = 1; 146 if (norm_sums) 147 { 148 path_len = 0; 149 long int target_t = static_cast <long int> (toi_reduced [j]); 150 size_t from_t = static_cast <size_t> (dp_fromi [i]); 151 while (target_t < INFINITE_INT) 152 { 153 path_len++; 154 size_t target_size_t = static_cast <size_t> (target_t); 155 target_t = prev [target_size_t]; 156 if (target_t < 0 || target_size_t == from_t) 157 break; 158 } 159 } 160 161 long int target = static_cast <int> (toi_reduced [j]); // can equal -1 162 while (target < INFINITE_INT) 163 { 164 size_t stt = static_cast <size_t> (target); 165 if (prev [stt] >= 0 && prev [stt] < INFINITE_INT) 166 { 167 std::string v2 = "f" + 168 vert_name [static_cast <size_t> (prev [stt])] + 169 "t" + vert_name [stt]; 170 output [verts_to_edge_map.at (v2)] += 171 flow_ij / static_cast <double> (path_len); 172 } 173 174 target = prev [stt]; 175 // Only allocate that flow from origin vertex v to all 176 // previous vertices up until the target vi 177 if (target < 0L || target == dp_fromi [i]) 178 { 179 break; 180 } 181 } 182 } 183 } 184 } 185 } // end for i 186 } // end parallel function operator 187 188 void join (const OneAggregate &rhs) 189 { 190 for (size_t i = 0; i < output.size (); i++) 191 output [i] += rhs.output [i]; 192 } 193}; 194 195 196struct OneAggregatePaired : public RcppParallel::Worker 197{ 198 RcppParallel::RVector <int> dp_fromtoi; 199 const RcppParallel::RVector <double> flows; 200 const std::vector <std::string> vert_name; 201 const std::unordered_map <std::string, size_t> verts_to_edge_map; 202 size_t nfrom; 203 size_t nverts; // can't be const because of reinterpret cast 204 size_t nedges; 205 const bool norm_sums; 206 const double tol; 207 const std::string heap_type; 208 std::shared_ptr <DGraph> g; 209 210 std::vector <double> output; 211 212 // Constructor 1: The main constructor 213 OneAggregatePaired ( 214 const RcppParallel::RVector <int> fromtoi, 215 const RcppParallel::RVector <double> flows_in, 216 const std::vector <std::string> vert_name_in, 217 const std::unordered_map <std::string, size_t> verts_to_edge_map_in, 218 const size_t nfrom_in, 219 const size_t nverts_in, 220 const size_t nedges_in, 221 const bool norm_sums_in, 222 const double tol_in, 223 const std::string &heap_type_in, 224 const std::shared_ptr <DGraph> g_in 225 ) : 226 dp_fromtoi (fromtoi), flows (flows_in), 227 vert_name (vert_name_in), 228 verts_to_edge_map (verts_to_edge_map_in), 229 nfrom (nfrom_in), nverts (nverts_in), nedges (nedges_in), 230 norm_sums (norm_sums_in), tol (tol_in), heap_type (heap_type_in), 231 g (g_in), output () 232 { 233 output.resize (nedges, 0.0); 234 } 235 236 // Constructor 2: The Split constructor 237 OneAggregatePaired ( 238 const OneAggregatePaired& oneAggregatePaired, 239 RcppParallel::Split) : 240 dp_fromtoi (oneAggregatePaired.dp_fromtoi), 241 flows (oneAggregatePaired.flows), 242 vert_name (oneAggregatePaired.vert_name), 243 verts_to_edge_map (oneAggregatePaired.verts_to_edge_map), 244 nfrom (oneAggregatePaired.nfrom), 245 nverts (oneAggregatePaired.nverts), 246 nedges (oneAggregatePaired.nedges), 247 norm_sums (oneAggregatePaired.norm_sums), 248 tol (oneAggregatePaired.tol), 249 heap_type (oneAggregatePaired.heap_type), 250 g (oneAggregatePaired.g), output () 251 { 252 output.resize (nedges, 0.0); 253 } 254 255 // Parallel function operator 256 void operator() (size_t begin, size_t end) 257 { 258 std::shared_ptr<PF::PathFinder> pathfinder = 259 std::make_shared <PF::PathFinder> (nverts, 260 *run_sp::getHeapImpl (heap_type), g); 261 std::vector <double> w (nverts); 262 std::vector <double> d (nverts); 263 std::vector <long int> prev (nverts); 264 265 for (size_t i = begin; i < end; i++) 266 { 267 //if (RcppThread::isInterrupted (i % static_cast<int>(100) == 0)) 268 //if (RcppThread::isInterrupted ()) 269 // return; 270 271 const size_t from_i = static_cast <size_t> (dp_fromtoi [i]); 272 // to_i has to be a vector for Dijkstra algo, but has only 1 273 // element. 274 const std::vector <size_t> to_i = {static_cast <size_t> (dp_fromtoi [nfrom + i])}; 275 276 // These have to be reserved within the parallel operator function! 277 std::fill (w.begin (), w.end (), INFINITE_DOUBLE); 278 std::fill (d.begin (), d.end (), INFINITE_DOUBLE); 279 std::fill (prev.begin (), prev.end (), INFINITE_INT); 280 281 d [from_i] = w [from_i] = 0.0; 282 283 pathfinder->Dijkstra (d, w, prev, from_i, to_i); 284 // loop has always only one element here: 285 for (size_t j = 0; j < to_i.size (); j++) 286 { 287 if (w [to_i [j]] < INFINITE_DOUBLE && flows [i] > 0.0) 288 { 289 // count how long the path is, so flows on 290 // each edge can be divided by this length 291 int path_len = 1; 292 if (norm_sums) 293 { 294 path_len = 0; 295 long int target_t = static_cast <long int> (to_i [j]); 296 while (target_t < INFINITE_INT) 297 { 298 path_len++; 299 size_t target_size_t = static_cast <size_t> (target_t); 300 target_t = prev [target_size_t]; 301 if (target_t < 0 || target_size_t == from_i) 302 break; 303 } 304 } 305 306 long int target = static_cast <int> (to_i [j]); // can equal -1 307 while (target < INFINITE_INT) 308 { 309 size_t stt = static_cast <size_t> (target); 310 if (prev [stt] >= 0 && prev [stt] < INFINITE_INT) 311 { 312 std::string v2 = "f" + 313 vert_name [static_cast <size_t> (prev [stt])] + 314 "t" + vert_name [stt]; 315 output [verts_to_edge_map.at (v2)] += 316 flows [i] / static_cast <double> (path_len); 317 } 318 319 target = prev [stt]; 320 // Only allocate that flow from origin vertex v to all 321 // previous vertices up until the target vi 322 if (target < 0L || target == from_i) 323 { 324 break; 325 } 326 } 327 } 328 } 329 } // end for i 330 } // end parallel function operator 331 332 void join (const OneAggregatePaired &rhs) 333 { 334 for (size_t i = 0; i < output.size (); i++) 335 output [i] += rhs.output [i]; 336 } 337}; 338 339 340struct OneDisperse : public RcppParallel::Worker 341{ 342 RcppParallel::RVector <int> dp_fromi; 343 const RcppParallel::RVector <double> dens; 344 const std::vector <std::string> vert_name; 345 const std::unordered_map <std::string, size_t> verts_to_edge_map; 346 size_t nverts; // can't be const because of reinterpret cast 347 size_t nedges; 348 const RcppParallel::RVector <double> kfrom; 349 const double tol; 350 const std::string heap_type; 351 std::shared_ptr <DGraph> g; 352 353 std::vector <double> output; 354 355 // Constructor 1: The main constructor 356 OneDisperse ( 357 const RcppParallel::RVector <int> fromi, 358 const RcppParallel::RVector <double> dens_in, 359 const std::vector <std::string> vert_name_in, 360 const std::unordered_map <std::string, size_t> verts_to_edge_map_in, 361 const size_t nverts_in, 362 const size_t nedges_in, 363 const RcppParallel::RVector <double> kfrom_in, 364 const double tol_in, 365 const std::string &heap_type_in, 366 const std::shared_ptr <DGraph> g_in) : 367 dp_fromi (fromi), dens (dens_in), vert_name (vert_name_in), 368 verts_to_edge_map (verts_to_edge_map_in), 369 nverts (nverts_in), nedges (nedges_in), kfrom (kfrom_in), 370 tol (tol_in), heap_type (heap_type_in), g (g_in), output () 371 { 372 const R_xlen_t nfrom = static_cast <R_xlen_t> (dens.size ()); 373 const R_xlen_t nk = static_cast <R_xlen_t> (kfrom.size ()) / nfrom; 374 const size_t out_size = nedges * static_cast <size_t> (nk); 375 output.resize (out_size, 0.0); 376 } 377 378 // Constructor 2: The Split constructor 379 OneDisperse ( 380 const OneDisperse& oneDisperse, 381 RcppParallel::Split) : 382 dp_fromi (oneDisperse.dp_fromi), dens (oneDisperse.dens), 383 vert_name (oneDisperse.vert_name), 384 verts_to_edge_map (oneDisperse.verts_to_edge_map), 385 nverts (oneDisperse.nverts), nedges (oneDisperse.nedges), 386 kfrom (oneDisperse.kfrom), tol (oneDisperse.tol), 387 heap_type (oneDisperse.heap_type), g (oneDisperse.g), output () 388 { 389 const R_xlen_t nfrom = static_cast <R_xlen_t> (dens.size ()); 390 const R_xlen_t nk = static_cast <R_xlen_t> (kfrom.size ()) / nfrom; 391 size_t out_size = nedges * static_cast <size_t> (nk); 392 output.resize (out_size, 0.0); 393 } 394 395 396 // Parallel function operator 397 void operator() (size_t begin, size_t end) 398 { 399 std::shared_ptr<PF::PathFinder> pathfinder = 400 std::make_shared <PF::PathFinder> (nverts, 401 *run_sp::getHeapImpl (heap_type), g); 402 std::vector <double> w (nverts); 403 std::vector <double> d (nverts); 404 std::vector <long int> prev (nverts); 405 406 const size_t nfrom = dens.size (); 407 const size_t nk = kfrom.size () / nfrom; 408 409 for (size_t i = begin; i < end; i++) // over the from vertices 410 { 411 //if (RcppThread::isInterrupted (i % static_cast<int>(10) == 0)) 412 if (RcppThread::isInterrupted ()) 413 return; 414 415 // translate k-value to distance limit based on tol 416 // exp(-d / k) = tol -> d = -k * log (tol) 417 // k_from holds nk vectors of different k-values, each of length 418 // nedges. 419 double dlim = 0.0; 420 for (size_t k = 0; k < nk; k++) 421 if (kfrom [i + k * nfrom] > dlim) 422 dlim = kfrom [i + k * nfrom]; // dlim is max k-value 423 dlim = -dlim * log (tol); // converted to actual dist limit. 424 425 std::fill (w.begin (), w.end (), INFINITE_DOUBLE); 426 std::fill (d.begin (), d.end (), INFINITE_DOUBLE); 427 std::fill (prev.begin (), prev.end (), INFINITE_INT); 428 429 const size_t from_i = static_cast <size_t> (dp_fromi [i]); 430 d [from_i] = w [from_i] = 0.0; 431 432 pathfinder->DijkstraLimit (d, w, prev, from_i, dlim); 433 434 std::vector <double> flows_i (nedges * nk, 0.0); 435 std::vector <double> expsum (nk, 0.0); 436 437 for (size_t j = 0; j < nverts; j++) 438 { 439 if (prev [j] >= 0 && prev [j] < INFINITE_INT) 440 { 441 const std::string vert_to = vert_name [j], 442 vert_from = vert_name [static_cast <size_t> (prev [j])]; 443 const std::string two_verts = "f" + vert_from + "t" + vert_to; 444 445 size_t index = verts_to_edge_map.at (two_verts); 446 if (d [j] < INFINITE_DOUBLE) 447 { 448 for (size_t k = 0; k < nk; k++) 449 { 450 double exp_jk; 451 if (kfrom [i + k * nfrom] > 0.0) 452 exp_jk = exp (-d [j] / kfrom [i + k * nfrom]); 453 else 454 { 455 // standard logistic polygonal for UK cycling 456 // models 457 double lp = -3.894 + (-0.5872 * d [j]) + 458 (1.832 * sqrt (d [j])) + 459 (0.007956 * d [j] * d [j]); 460 exp_jk = exp (lp) / (1.0 + exp (lp)); 461 } 462 const size_t k_st = static_cast <size_t> (k); 463 expsum [k_st] += exp_jk; 464 flows_i [index + k_st * nedges] += dens [i] * exp_jk; 465 } 466 } 467 } 468 } // end for j 469 for (size_t k = 0; k < nk; k++) 470 if (expsum [k] > tol) 471 for (size_t j = 0; j < nedges; j++) 472 output [j + k * nedges] += 473 flows_i [j + k * nedges] / expsum [k]; 474 } // end for i 475 } // end parallel function operator 476 477 void join (const OneDisperse &rhs) 478 { 479 for (size_t i = 0; i < output.size (); i++) 480 output [i] += rhs.output [i]; 481 } 482}; 483 484struct OneSI : public RcppParallel::Worker 485{ 486 RcppParallel::RVector <int> dp_fromi; 487 const std::vector <size_t> toi; 488 const RcppParallel::RVector <double> k_from; 489 const RcppParallel::RVector <double> dens_from; 490 const RcppParallel::RVector <double> dens_to; 491 const std::vector <std::string> vert_name; 492 const std::unordered_map <std::string, size_t> verts_to_edge_map; 493 size_t nverts; // can't be const because of reinterpret cast 494 size_t nedges; 495 const bool norm_sums; 496 const double tol; 497 const std::string heap_type; 498 std::shared_ptr <DGraph> g; 499 500 std::vector <double> output; 501 502 // Constructor 1: The main constructor 503 OneSI ( 504 const RcppParallel::RVector <int> fromi, 505 const std::vector <size_t> toi_in, 506 const RcppParallel::RVector <double> k_from_in, 507 const RcppParallel::RVector <double> dens_from_in, 508 const RcppParallel::RVector <double> dens_to_in, 509 const std::vector <std::string> vert_name_in, 510 const std::unordered_map <std::string, size_t> verts_to_edge_map_in, 511 const size_t nverts_in, 512 const size_t nedges_in, 513 const bool norm_sums_in, 514 const double tol_in, 515 const std::string &heap_type_in, 516 const std::shared_ptr <DGraph> g_in) : 517 dp_fromi (fromi), toi (toi_in), k_from (k_from_in), 518 dens_from (dens_from_in), dens_to (dens_to_in), 519 vert_name (vert_name_in), verts_to_edge_map (verts_to_edge_map_in), 520 nverts (nverts_in), nedges (nedges_in), norm_sums (norm_sums_in), 521 tol (tol_in), heap_type (heap_type_in), g (g_in), output () 522 { 523 const size_t nfrom = dens_from.size (); 524 const size_t nk = k_from.size () / nfrom; 525 const size_t out_size = nedges * nk; 526 output.resize (out_size, 0.0); 527 } 528 529 // Constructor 2: The Split constructor 530 OneSI ( 531 const OneSI& oneSI, 532 RcppParallel::Split) : 533 dp_fromi (oneSI.dp_fromi), toi (oneSI.toi), k_from (oneSI.k_from), 534 dens_from (oneSI.dens_from), dens_to (oneSI.dens_to), 535 vert_name (oneSI.vert_name), verts_to_edge_map (oneSI.verts_to_edge_map), 536 nverts (oneSI.nverts), nedges (oneSI.nedges), 537 norm_sums (oneSI.norm_sums), tol (oneSI.tol), 538 heap_type (oneSI.heap_type), g (oneSI.g), output () 539 { 540 const size_t nfrom = dens_from.size (); 541 const size_t nk = k_from.size () / nfrom; 542 const size_t out_size = nedges * nk; 543 output.resize (out_size, 0.0); 544 } 545 546 // Parallel function operator 547 void operator() (size_t begin, size_t end) 548 { 549 std::shared_ptr<PF::PathFinder> pathfinder = 550 std::make_shared <PF::PathFinder> (nverts, 551 *run_sp::getHeapImpl (heap_type), g); 552 std::vector <double> w (nverts); 553 std::vector <double> d (nverts); 554 std::vector <long int> prev (nverts); 555 556 // k_from can have multiple vectors of k-values, each equal in length to 557 // the number of 'from' points. The output is then a single vector of 558 // 'nedges' wrapped 'nk' times. 559 const size_t nfrom = dens_from.size (); 560 const size_t nk = k_from.size () / nfrom; 561 562 for (size_t i = begin; i < end; i++) 563 { 564 //if (RcppThread::isInterrupted (i % static_cast<int>(100) == 0)) 565 if (RcppThread::isInterrupted ()) 566 return; 567 568 // These have to be reserved within the parallel operator function! 569 std::fill (w.begin (), w.end (), INFINITE_DOUBLE); 570 std::fill (d.begin (), d.end (), INFINITE_DOUBLE); 571 std::fill (prev.begin (), prev.end (), INFINITE_INT); 572 573 size_t from_i = static_cast <size_t> (dp_fromi [i]); 574 d [from_i] = w [from_i] = 0.0; 575 576 // translate k-value to distance limit based on tol 577 // exp(-d / k) = tol -> d = -k * log (tol) 578 // const double dlim = -k_from [i] * log (tol); 579 // k_from holds nk vectors of different k-values, each of length 580 // nedges. 581 double dlim = 0.0; 582 for (size_t k = 0; k < nk; k++) 583 if (k_from [i + k * nfrom] > dlim) 584 dlim = k_from [i + k * nfrom]; 585 dlim = -dlim * log (tol); 586 587 pathfinder->DijkstraLimit (d, w, prev, from_i, dlim); 588 589 std::vector <double> flows_i (nedges * nk, 0.0); 590 std::vector <double> expsum (nk, 0.0); 591 for (size_t j = 0; j < toi.size (); j++) 592 { 593 if (from_i != toi [j]) // Exclude self-flows 594 { 595 if (d [toi [j]] < INFINITE_DOUBLE) 596 { 597 /* Flow between i and j is based on d (i, j), but is 598 * subsequently allocated to all intermediate edges. 599 * The `norm_sums` option re-scales this intermediate 600 * edge allocation such that SI value to one terminal 601 * vertex from the origin vertex of: 602 * N_i N_j exp (-d_{ij}) / sum_j (N_{ij} exp (-d_{ij})) 603 * is divided by the number of intermediate edges 604 * between i and j. This is achieved by initially only 605 * allocating the values of the numerator divided by 606 * numbers of intermediate edges, while accumulating the 607 * denominator independent of that value in order to 608 * divide all final values. 609 */ 610 std::vector <double> exp_jk (nk, 0.0), 611 flow_ijk (nk, 0.0); 612 for (size_t k = 0; k < nk; k++) 613 { 614 exp_jk [k] = dens_to [j] * 615 exp (-d [toi [j]] / k_from [i + k * nfrom]); 616 flow_ijk [k] = dens_from [i] * exp_jk [k]; 617 expsum [k] += exp_jk [k]; 618 } 619 620 // need to count how long the path is, so flows on 621 // each edge can be divided by this length 622 int path_len = 1; 623 if (norm_sums) 624 { 625 path_len = 0; 626 long int target_t = static_cast <long int> (toi [j]); 627 size_t from_t = static_cast <size_t> (dp_fromi [i]); 628 while (target_t < INFINITE_INT) 629 { 630 size_t target_size_t = static_cast <size_t> (target_t); 631 path_len++; 632 target_t = prev [target_size_t]; 633 if (target_t < 0 || target_size_t == from_t) 634 break; 635 } 636 } 637 638 long int target = static_cast <long int> (toi [j]); // can equal -1 639 while (target < INFINITE_INT) 640 { 641 size_t stt = static_cast <size_t> (target); 642 if (prev [stt] >= 0 && prev [stt] < INFINITE_INT) 643 { 644 std::string v2 = "f" + 645 vert_name [static_cast <size_t> (prev [stt])] + 646 "t" + vert_name [stt]; 647 // multiple flows can aggregate to same edge, so 648 // this has to be +=, not just =! 649 size_t index = verts_to_edge_map.at (v2); 650 for (size_t k = 0; k < nk; k++) 651 { 652 flows_i [index + k * nedges] += 653 flow_ijk [k] / 654 static_cast <double> (path_len); 655 } 656 } 657 658 target = static_cast <long int> (prev [stt]); 659 // Only allocate that flow from origin vertex v to all 660 // previous vertices up until the target vi 661 if (target < 0L || target == dp_fromi [i]) 662 { 663 break; 664 } 665 } 666 } 667 } 668 } // end for j 669 for (size_t k = 0; k < nk; k++) 670 if (expsum [k] > tol) 671 for (size_t j = 0; j < nedges; j++) 672 output [j + k * nedges] += 673 flows_i [j + k * nedges] / expsum [k]; 674 } // end for i 675 } // end parallel function operator 676 677 void join (const OneSI& rhs) 678 { 679 for (size_t i = 0; i < output.size (); i++) 680 output [i] += rhs.output [i]; 681 } 682}; 683 684//' rcpp_flows_aggregate_par 685//' 686//' @param graph The data.frame holding the graph edges 687//' @param vert_map_in map from <std::string> vertex ID to (0-indexed) integer 688//' index of vertices 689//' @param fromi Index into vert_map_in of vertex numbers 690//' @param toi Index into vert_map_in of vertex numbers 691//' @param tol Relative tolerance in terms of flows below which targets 692//' (to-vertices) are not considered. 693//' 694//' @note The parallelisation is achieved by dumping the results of each thread 695//' to a file, with aggregation performed at the end by simply reading back and 696//' aggregating all files. There is no way to aggregate into a single vector 697//' because threads have to be independent. The only danger with this approach 698//' is that multiple threads may generate the same file names, but with names 10 699//' characters long, that chance should be 1 / 62 ^ 10. 700//' 701//' @noRd 702// [[Rcpp::export]] 703Rcpp::NumericVector rcpp_flows_aggregate_par (const Rcpp::DataFrame graph, 704 const Rcpp::DataFrame vert_map_in, 705 Rcpp::IntegerVector fromi, 706 Rcpp::IntegerVector toi_in, 707 Rcpp::NumericMatrix flows, 708 const bool norm_sums, 709 const double tol, 710 const std::string heap_type) 711{ 712 std::vector <size_t> toi = 713 Rcpp::as <std::vector <size_t> > (toi_in); 714 const size_t nfrom = static_cast <size_t> (fromi.size ()); 715 716 const std::vector <std::string> from = graph ["from"]; 717 const std::vector <std::string> to = graph ["to"]; 718 const std::vector <double> dist = graph ["d"]; 719 const std::vector <double> wt = graph ["d_weighted"]; 720 721 const size_t nedges = static_cast <size_t> (graph.nrow ()); 722 const std::vector <std::string> vert_name = vert_map_in ["vert"]; 723 const std::vector <size_t> vert_indx = vert_map_in ["id"]; 724 // Make map from vertex name to integer index 725 std::map <std::string, size_t> vert_map_i; 726 const size_t nverts = run_sp::make_vert_map (vert_map_in, vert_name, 727 vert_indx, vert_map_i); 728 729 std::unordered_map <std::string, size_t> verts_to_edge_map; 730 std::unordered_map <std::string, double> verts_to_dist_map; 731 run_sp::make_vert_to_edge_maps (from, to, wt, verts_to_edge_map, verts_to_dist_map); 732 733 std::shared_ptr <DGraph> g = std::make_shared <DGraph> (nverts); 734 inst_graph (g, nedges, vert_map_i, from, to, dist, wt); 735 736 // Create parallel worker 737 OneAggregate oneAggregate (RcppParallel::RVector <int> (fromi), toi, 738 RcppParallel::RMatrix <double> (flows), vert_name, verts_to_edge_map, 739 nverts, nedges, norm_sums, tol, heap_type, g); 740 741 size_t chunk_size = run_sp::get_chunk_size (nfrom); 742 RcppParallel::parallelReduce (0, nfrom, oneAggregate, chunk_size); 743 744 return Rcpp::wrap (oneAggregate.output); 745} 746 747 748//' rcpp_flows_aggregate_pairwise 749//' 750//' Pairwise version of flows_aggregate_par 751//' 752//' @param graph The data.frame holding the graph edges 753//' @param vert_map_in map from <std::string> vertex ID to (0-indexed) integer 754//' index of vertices 755//' @param fromi Index into vert_map_in of vertex numbers 756//' @param toi Index into vert_map_in of vertex numbers 757//' @param tol Relative tolerance in terms of flows below which targets 758//' (to-vertices) are not considered. 759//' 760//' @note The parallelisation is achieved by dumping the results of each thread 761//' to a file, with aggregation performed at the end by simply reading back and 762//' aggregating all files. There is no way to aggregate into a single vector 763//' because threads have to be independent. The only danger with this approach 764//' is that multiple threads may generate the same file names, but with names 10 765//' characters long, that chance should be 1 / 62 ^ 10. 766//' 767//' @noRd 768// [[Rcpp::export]] 769Rcpp::NumericVector rcpp_flows_aggregate_pairwise (const Rcpp::DataFrame graph, 770 const Rcpp::DataFrame vert_map_in, 771 Rcpp::IntegerVector fromi, 772 Rcpp::IntegerVector toi, 773 Rcpp::NumericVector flows, 774 const bool norm_sums, 775 const double tol, 776 const std::string heap_type) 777{ 778 if (fromi.size () != toi.size ()) 779 Rcpp::stop ("pairwise dists must have from.size == to.size"); 780 long int n = fromi.size (); 781 size_t n_st = static_cast <size_t> (n); 782 783 const std::vector <std::string> from = graph ["from"]; 784 const std::vector <std::string> to = graph ["to"]; 785 const std::vector <double> dist = graph ["d"]; 786 const std::vector <double> wt = graph ["d_weighted"]; 787 788 const size_t nedges = static_cast <size_t> (graph.nrow ()); 789 const std::vector <std::string> vert_name = vert_map_in ["vert"]; 790 const std::vector <size_t> vert_indx = vert_map_in ["id"]; 791 // Make map from vertex name to integer index 792 std::map <std::string, size_t> vert_map_i; 793 const size_t nverts = run_sp::make_vert_map (vert_map_in, vert_name, 794 vert_indx, vert_map_i); 795 796 std::unordered_map <std::string, size_t> verts_to_edge_map; 797 std::unordered_map <std::string, double> verts_to_dist_map; 798 run_sp::make_vert_to_edge_maps (from, to, wt, verts_to_edge_map, verts_to_dist_map); 799 800 std::shared_ptr <DGraph> g = std::make_shared <DGraph> (nverts); 801 inst_graph (g, nedges, vert_map_i, from, to, dist, wt); 802 803 // Paired (fromi, toi) in a single vector 804 Rcpp::IntegerVector fromto (2 * n_st); 805 for (int i = 0; i < n; i++) 806 { 807 size_t i_t = static_cast <size_t> (i); 808 fromto [i] = fromi (i_t); 809 fromto [i + n] = toi (i_t); 810 } 811 812 // Create parallel worker 813 OneAggregatePaired oneAggregatePaired (RcppParallel::RVector <int> (fromto), 814 RcppParallel::RVector <double> (flows), vert_name, verts_to_edge_map, 815 n_st, nverts, nedges, norm_sums, tol, heap_type, g); 816 817 size_t chunk_size = run_sp::get_chunk_size (n_st); 818 RcppParallel::parallelReduce (0, n_st, oneAggregatePaired, chunk_size); 819 820 return Rcpp::wrap (oneAggregatePaired.output); 821} 822 823 824 825//' rcpp_flows_disperse_par 826//' 827//' Modified version of \code{rcpp_flows_aggregate} that aggregates flows to all 828//' destinations from given set of origins, with flows attenuated by distance 829//' from those origins. 830//' 831//' @param graph The data.frame holding the graph edges 832//' @param vert_map_in map from <std::string> vertex ID to (0-indexed) integer 833//' index of vertices 834//' @param fromi Index into vert_map_in of vertex numbers 835//' @param k Coefficient of (current proof-of-principle-only) exponential 836//' distance decay function. If value of \code{k<0} is given, a standard 837//' logistic polynomial will be used. 838//' 839//' @note The flow data to be used for aggregation is a matrix mapping flows 840//' between each pair of from and to points. 841//' 842//' @noRd 843// [[Rcpp::export]] 844Rcpp::NumericVector rcpp_flows_disperse_par (const Rcpp::DataFrame graph, 845 const Rcpp::DataFrame vert_map_in, 846 Rcpp::IntegerVector fromi, 847 Rcpp::NumericVector k, 848 Rcpp::NumericVector dens, 849 const double &tol, 850 std::string heap_type) 851{ 852 const size_t nfrom = static_cast <size_t> (fromi.size ()); 853 854 std::vector <std::string> from = graph ["from"]; 855 std::vector <std::string> to = graph ["to"]; 856 std::vector <double> dist = graph ["d"]; 857 std::vector <double> wt = graph ["d_weighted"]; 858 859 size_t nedges = static_cast <size_t> (graph.nrow ()); 860 std::vector <std::string> vert_name = vert_map_in ["vert"]; 861 std::vector <size_t> vert_indx = vert_map_in ["id"]; 862 // Make map from vertex name to integer index 863 std::map <std::string, size_t> vert_map_i; 864 size_t nverts = run_sp::make_vert_map (vert_map_in, vert_name, 865 vert_indx, vert_map_i); 866 867 std::unordered_map <std::string, size_t> verts_to_edge_map; 868 std::unordered_map <std::string, double> verts_to_dist_map; 869 run_sp::make_vert_to_edge_maps (from, to, wt, verts_to_edge_map, verts_to_dist_map); 870 871 std::shared_ptr<DGraph> g = std::make_shared<DGraph> (nverts); 872 inst_graph (g, nedges, vert_map_i, from, to, dist, wt); 873 874 // Create parallel worker 875 OneDisperse oneDisperse (RcppParallel::RVector <int> (fromi), 876 RcppParallel::RVector <double> (dens), 877 vert_name, verts_to_edge_map, 878 nverts, nedges, 879 RcppParallel::RVector <double> (k), tol, heap_type, g); 880 881 size_t chunk_size = run_sp::get_chunk_size (nfrom); 882 RcppParallel::parallelReduce (0, nfrom, oneDisperse, chunk_size); 883 884 return Rcpp::wrap (oneDisperse.output); 885} 886 887 888//' rcpp_flows_si 889//' 890//' @param graph The data.frame holding the graph edges 891//' @param vert_map_in map from <std::string> vertex ID to (0-indexed) integer 892//' index of vertices 893//' @param fromi Index into vert_map_in of vertex numbers 894//' @param toi Index into vert_map_in of vertex numbers 895//' @param kvec Vector of k-values for each fromi 896//' @param nvec Vector of density-values for each fromi 897//' @param tol Relative tolerance in terms of flows below which targets 898//' (to-vertices) are not considered. 899//' 900//' @noRd 901// [[Rcpp::export]] 902Rcpp::NumericVector rcpp_flows_si (const Rcpp::DataFrame graph, 903 const Rcpp::DataFrame vert_map_in, 904 Rcpp::IntegerVector fromi, 905 Rcpp::IntegerVector toi_in, 906 Rcpp::NumericVector kvec, 907 Rcpp::NumericVector dens_from, 908 Rcpp::NumericVector dens_to, 909 const bool norm_sums, 910 const double tol, 911 const std::string heap_type) 912{ 913 std::vector <size_t> toi = 914 Rcpp::as <std::vector <size_t> > ( toi_in); 915 const size_t nfrom = static_cast <size_t> (fromi.size ()); 916 917 const std::vector <std::string> from = graph ["from"]; 918 const std::vector <std::string> to = graph ["to"]; 919 const std::vector <double> dist = graph ["d"]; 920 const std::vector <double> wt = graph ["d_weighted"]; 921 922 const size_t nedges = static_cast <size_t> (graph.nrow ()); 923 const std::vector <std::string> vert_name = vert_map_in ["vert"]; 924 const std::vector <size_t> vert_indx = vert_map_in ["id"]; 925 // Make map from vertex name to integer index 926 std::map <std::string, size_t> vert_map_i; 927 const size_t nverts = run_sp::make_vert_map (vert_map_in, vert_name, 928 vert_indx, vert_map_i); 929 930 std::unordered_map <std::string, size_t> verts_to_edge_map; 931 std::unordered_map <std::string, double> verts_to_dist_map; 932 run_sp::make_vert_to_edge_maps (from, to, wt, verts_to_edge_map, verts_to_dist_map); 933 934 std::shared_ptr <DGraph> g = std::make_shared <DGraph> (nverts); 935 inst_graph (g, nedges, vert_map_i, from, to, dist, wt); 936 937 // Create parallel worker 938 OneSI oneSI (RcppParallel::RVector <int> (fromi), toi, 939 RcppParallel::RVector <double> (kvec), 940 RcppParallel::RVector <double> (dens_from), 941 RcppParallel::RVector <double> (dens_to), 942 vert_name, verts_to_edge_map, 943 nverts, nedges, norm_sums, tol, heap_type, g); 944 945 size_t chunk_size = run_sp::get_chunk_size (nfrom); 946 RcppParallel::parallelReduce (0, nfrom, oneSI, chunk_size); 947 948 return Rcpp::wrap (oneSI.output); 949}