Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v5.8-rc3 1345 lines 40 kB view raw
1// SPDX-License-Identifier: GPL-2.0-only 2/* 3 * Intel MIC Platform Software Stack (MPSS) 4 * 5 * Copyright(c) 2014 Intel Corporation. 6 * 7 * Intel SCIF driver. 8 */ 9#include "../bus/scif_bus.h" 10#include "scif_peer_bus.h" 11#include "scif_main.h" 12#include "scif_nodeqp.h" 13#include "scif_map.h" 14 15/* 16 ************************************************************************ 17 * SCIF node Queue Pair (QP) setup flow: 18 * 19 * 1) SCIF driver gets probed with a scif_hw_dev via the scif_hw_bus 20 * 2) scif_setup_qp(..) allocates the local qp and calls 21 * scif_setup_qp_connect(..) which allocates and maps the local 22 * buffer for the inbound QP 23 * 3) The local node updates the device page with the DMA address of the QP 24 * 4) A delayed work is scheduled (qp_dwork) which periodically reads if 25 * the peer node has updated its QP DMA address 26 * 5) Once a valid non zero address is found in the QP DMA address field 27 * in the device page, the local node maps the remote node's QP, 28 * updates its outbound QP and sends a SCIF_INIT message to the peer 29 * 6) The SCIF_INIT message is received by the peer node QP interrupt bottom 30 * half handler by calling scif_init(..) 31 * 7) scif_init(..) registers a new SCIF peer node by calling 32 * scif_peer_register_device(..) which signifies the addition of a new 33 * SCIF node 34 * 8) On the mgmt node, P2P network setup/teardown is initiated if all the 35 * remote nodes are online via scif_p2p_setup(..) 36 * 9) For P2P setup, the host maps the remote nodes' aperture and memory 37 * bars and sends a SCIF_NODE_ADD message to both nodes 38 * 10) As part of scif_nodeadd, both nodes set up their local inbound 39 * QPs and send a SCIF_NODE_ADD_ACK to the mgmt node 40 * 11) As part of scif_node_add_ack(..) the mgmt node forwards the 41 * SCIF_NODE_ADD_ACK to the remote nodes 42 * 12) As part of scif_node_add_ack(..) the remote nodes update their 43 * outbound QPs, make sure they can access memory on the remote node 44 * and then add a new SCIF peer node by calling 45 * scif_peer_register_device(..) which signifies the addition of a new 46 * SCIF node. 47 * 13) The SCIF network is now established across all nodes. 48 * 49 ************************************************************************ 50 * SCIF node QP teardown flow (initiated by non mgmt node): 51 * 52 * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus 53 * 2) The device page QP DMA address field is updated with 0x0 54 * 3) A non mgmt node now cleans up all local data structures and sends a 55 * SCIF_EXIT message to the peer and waits for a SCIF_EXIT_ACK 56 * 4) As part of scif_exit(..) handling scif_disconnect_node(..) is called 57 * 5) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the 58 * peers and waits for a SCIF_NODE_REMOVE_ACK 59 * 6) As part of scif_node_remove(..) a remote node unregisters the peer 60 * node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK 61 * 7) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs 62 * it sends itself a node remove message whose handling cleans up local 63 * data structures and unregisters the peer node from the SCIF network 64 * 8) The mgmt node sends a SCIF_EXIT_ACK 65 * 9) Upon receipt of the SCIF_EXIT_ACK the node initiating the teardown 66 * completes the SCIF remove routine 67 * 10) The SCIF network is now torn down for the node initiating the 68 * teardown sequence 69 * 70 ************************************************************************ 71 * SCIF node QP teardown flow (initiated by mgmt node): 72 * 73 * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus 74 * 2) The device page QP DMA address field is updated with 0x0 75 * 3) The mgmt node calls scif_disconnect_node(..) 76 * 4) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the peers 77 * and waits for a SCIF_NODE_REMOVE_ACK 78 * 5) As part of scif_node_remove(..) a remote node unregisters the peer 79 * node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK 80 * 6) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs 81 * it unregisters the peer node from the SCIF network 82 * 7) The mgmt node sends a SCIF_EXIT message and waits for a SCIF_EXIT_ACK. 83 * 8) A non mgmt node upon receipt of a SCIF_EXIT message calls scif_stop(..) 84 * which would clean up local data structures for all SCIF nodes and 85 * then send a SCIF_EXIT_ACK back to the mgmt node 86 * 9) Upon receipt of the SCIF_EXIT_ACK the the mgmt node sends itself a node 87 * remove message whose handling cleans up local data structures and 88 * destroys any P2P mappings. 89 * 10) The SCIF hardware device for which a remove callback was received is now 90 * disconnected from the SCIF network. 91 */ 92/* 93 * Initializes "local" data structures for the QP. Allocates the QP 94 * ring buffer (rb) and initializes the "in bound" queue. 95 */ 96int scif_setup_qp_connect(struct scif_qp *qp, dma_addr_t *qp_offset, 97 int local_size, struct scif_dev *scifdev) 98{ 99 void *local_q = qp->inbound_q.rb_base; 100 int err = 0; 101 u32 tmp_rd = 0; 102 103 spin_lock_init(&qp->send_lock); 104 spin_lock_init(&qp->recv_lock); 105 106 /* Allocate rb only if not already allocated */ 107 if (!local_q) { 108 local_q = kzalloc(local_size, GFP_KERNEL); 109 if (!local_q) { 110 err = -ENOMEM; 111 return err; 112 } 113 } 114 115 err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size); 116 if (err) 117 goto kfree; 118 /* 119 * To setup the inbound_q, the buffer lives locally, the read pointer 120 * is remote and the write pointer is local. 121 */ 122 scif_rb_init(&qp->inbound_q, 123 &tmp_rd, 124 &qp->local_write, 125 local_q, get_count_order(local_size)); 126 /* 127 * The read pointer is NULL initially and it is unsafe to use the ring 128 * buffer til this changes! 129 */ 130 qp->inbound_q.read_ptr = NULL; 131 err = scif_map_single(qp_offset, qp, 132 scifdev, sizeof(struct scif_qp)); 133 if (err) 134 goto unmap; 135 qp->local_qp = *qp_offset; 136 return err; 137unmap: 138 scif_unmap_single(qp->local_buf, scifdev, local_size); 139 qp->local_buf = 0; 140kfree: 141 kfree(local_q); 142 return err; 143} 144 145/* When the other side has already done it's allocation, this is called */ 146int scif_setup_qp_accept(struct scif_qp *qp, dma_addr_t *qp_offset, 147 dma_addr_t phys, int local_size, 148 struct scif_dev *scifdev) 149{ 150 void *local_q; 151 void *remote_q; 152 struct scif_qp *remote_qp; 153 int remote_size; 154 int err = 0; 155 156 spin_lock_init(&qp->send_lock); 157 spin_lock_init(&qp->recv_lock); 158 /* Start by figuring out where we need to point */ 159 remote_qp = scif_ioremap(phys, sizeof(struct scif_qp), scifdev); 160 if (!remote_qp) 161 return -EIO; 162 qp->remote_qp = remote_qp; 163 if (qp->remote_qp->magic != SCIFEP_MAGIC) { 164 err = -EIO; 165 goto iounmap; 166 } 167 qp->remote_buf = remote_qp->local_buf; 168 remote_size = qp->remote_qp->inbound_q.size; 169 remote_q = scif_ioremap(qp->remote_buf, remote_size, scifdev); 170 if (!remote_q) { 171 err = -EIO; 172 goto iounmap; 173 } 174 qp->remote_qp->local_write = 0; 175 /* 176 * To setup the outbound_q, the buffer lives in remote memory, 177 * the read pointer is local, the write pointer is remote 178 */ 179 scif_rb_init(&qp->outbound_q, 180 &qp->local_read, 181 &qp->remote_qp->local_write, 182 remote_q, 183 get_count_order(remote_size)); 184 local_q = kzalloc(local_size, GFP_KERNEL); 185 if (!local_q) { 186 err = -ENOMEM; 187 goto iounmap_1; 188 } 189 err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size); 190 if (err) 191 goto kfree; 192 qp->remote_qp->local_read = 0; 193 /* 194 * To setup the inbound_q, the buffer lives locally, the read pointer 195 * is remote and the write pointer is local 196 */ 197 scif_rb_init(&qp->inbound_q, 198 &qp->remote_qp->local_read, 199 &qp->local_write, 200 local_q, get_count_order(local_size)); 201 err = scif_map_single(qp_offset, qp, scifdev, 202 sizeof(struct scif_qp)); 203 if (err) 204 goto unmap; 205 qp->local_qp = *qp_offset; 206 return err; 207unmap: 208 scif_unmap_single(qp->local_buf, scifdev, local_size); 209 qp->local_buf = 0; 210kfree: 211 kfree(local_q); 212iounmap_1: 213 scif_iounmap(remote_q, remote_size, scifdev); 214 qp->outbound_q.rb_base = NULL; 215iounmap: 216 scif_iounmap(qp->remote_qp, sizeof(struct scif_qp), scifdev); 217 qp->remote_qp = NULL; 218 return err; 219} 220 221int scif_setup_qp_connect_response(struct scif_dev *scifdev, 222 struct scif_qp *qp, u64 payload) 223{ 224 int err = 0; 225 void *r_buf; 226 int remote_size; 227 phys_addr_t tmp_phys; 228 229 qp->remote_qp = scif_ioremap(payload, sizeof(struct scif_qp), scifdev); 230 231 if (!qp->remote_qp) { 232 err = -ENOMEM; 233 goto error; 234 } 235 236 if (qp->remote_qp->magic != SCIFEP_MAGIC) { 237 dev_err(&scifdev->sdev->dev, 238 "SCIFEP_MAGIC mismatch between self %d remote %d\n", 239 scif_dev[scif_info.nodeid].node, scifdev->node); 240 err = -ENODEV; 241 goto error; 242 } 243 244 tmp_phys = qp->remote_qp->local_buf; 245 remote_size = qp->remote_qp->inbound_q.size; 246 r_buf = scif_ioremap(tmp_phys, remote_size, scifdev); 247 248 if (!r_buf) 249 return -EIO; 250 251 qp->local_read = 0; 252 scif_rb_init(&qp->outbound_q, 253 &qp->local_read, 254 &qp->remote_qp->local_write, 255 r_buf, 256 get_count_order(remote_size)); 257 /* 258 * Because the node QP may already be processing an INIT message, set 259 * the read pointer so the cached read offset isn't lost 260 */ 261 qp->remote_qp->local_read = qp->inbound_q.current_read_offset; 262 /* 263 * resetup the inbound_q now that we know where the 264 * inbound_read really is. 265 */ 266 scif_rb_init(&qp->inbound_q, 267 &qp->remote_qp->local_read, 268 &qp->local_write, 269 qp->inbound_q.rb_base, 270 get_count_order(qp->inbound_q.size)); 271error: 272 return err; 273} 274 275static __always_inline void 276scif_send_msg_intr(struct scif_dev *scifdev) 277{ 278 struct scif_hw_dev *sdev = scifdev->sdev; 279 280 if (scifdev_is_p2p(scifdev)) 281 sdev->hw_ops->send_p2p_intr(sdev, scifdev->rdb, &scifdev->mmio); 282 else 283 sdev->hw_ops->send_intr(sdev, scifdev->rdb); 284} 285 286int scif_qp_response(phys_addr_t phys, struct scif_dev *scifdev) 287{ 288 int err = 0; 289 struct scifmsg msg; 290 291 err = scif_setup_qp_connect_response(scifdev, scifdev->qpairs, phys); 292 if (!err) { 293 /* 294 * Now that everything is setup and mapped, we're ready 295 * to tell the peer about our queue's location 296 */ 297 msg.uop = SCIF_INIT; 298 msg.dst.node = scifdev->node; 299 err = scif_nodeqp_send(scifdev, &msg); 300 } 301 return err; 302} 303 304void scif_send_exit(struct scif_dev *scifdev) 305{ 306 struct scifmsg msg; 307 int ret; 308 309 scifdev->exit = OP_IN_PROGRESS; 310 msg.uop = SCIF_EXIT; 311 msg.src.node = scif_info.nodeid; 312 msg.dst.node = scifdev->node; 313 ret = scif_nodeqp_send(scifdev, &msg); 314 if (ret) 315 goto done; 316 /* Wait for a SCIF_EXIT_ACK message */ 317 wait_event_timeout(scif_info.exitwq, scifdev->exit == OP_COMPLETED, 318 SCIF_NODE_ALIVE_TIMEOUT); 319done: 320 scifdev->exit = OP_IDLE; 321} 322 323int scif_setup_qp(struct scif_dev *scifdev) 324{ 325 int err = 0; 326 int local_size; 327 struct scif_qp *qp; 328 329 local_size = SCIF_NODE_QP_SIZE; 330 331 qp = kzalloc(sizeof(*qp), GFP_KERNEL); 332 if (!qp) { 333 err = -ENOMEM; 334 return err; 335 } 336 qp->magic = SCIFEP_MAGIC; 337 scifdev->qpairs = qp; 338 err = scif_setup_qp_connect(qp, &scifdev->qp_dma_addr, 339 local_size, scifdev); 340 if (err) 341 goto free_qp; 342 /* 343 * We're as setup as we can be. The inbound_q is setup, w/o a usable 344 * outbound q. When we get a message, the read_ptr will be updated, 345 * and we will pull the message. 346 */ 347 return err; 348free_qp: 349 kfree(scifdev->qpairs); 350 scifdev->qpairs = NULL; 351 return err; 352} 353 354static void scif_p2p_freesg(struct scatterlist *sg) 355{ 356 kfree(sg); 357} 358 359static struct scatterlist * 360scif_p2p_setsg(phys_addr_t pa, int page_size, int page_cnt) 361{ 362 struct scatterlist *sg; 363 struct page *page; 364 int i; 365 366 sg = kcalloc(page_cnt, sizeof(struct scatterlist), GFP_KERNEL); 367 if (!sg) 368 return NULL; 369 sg_init_table(sg, page_cnt); 370 for (i = 0; i < page_cnt; i++) { 371 page = pfn_to_page(pa >> PAGE_SHIFT); 372 sg_set_page(&sg[i], page, page_size, 0); 373 pa += page_size; 374 } 375 return sg; 376} 377 378/* Init p2p mappings required to access peerdev from scifdev */ 379static struct scif_p2p_info * 380scif_init_p2p_info(struct scif_dev *scifdev, struct scif_dev *peerdev) 381{ 382 struct scif_p2p_info *p2p; 383 int num_mmio_pages, num_aper_pages, sg_page_shift, err, num_aper_chunks; 384 struct scif_hw_dev *psdev = peerdev->sdev; 385 struct scif_hw_dev *sdev = scifdev->sdev; 386 387 num_mmio_pages = psdev->mmio->len >> PAGE_SHIFT; 388 num_aper_pages = psdev->aper->len >> PAGE_SHIFT; 389 390 p2p = kzalloc(sizeof(*p2p), GFP_KERNEL); 391 if (!p2p) 392 return NULL; 393 p2p->ppi_sg[SCIF_PPI_MMIO] = scif_p2p_setsg(psdev->mmio->pa, 394 PAGE_SIZE, num_mmio_pages); 395 if (!p2p->ppi_sg[SCIF_PPI_MMIO]) 396 goto free_p2p; 397 p2p->sg_nentries[SCIF_PPI_MMIO] = num_mmio_pages; 398 sg_page_shift = get_order(min(psdev->aper->len, (u64)(1 << 30))); 399 num_aper_chunks = num_aper_pages >> (sg_page_shift - PAGE_SHIFT); 400 p2p->ppi_sg[SCIF_PPI_APER] = scif_p2p_setsg(psdev->aper->pa, 401 1 << sg_page_shift, 402 num_aper_chunks); 403 p2p->sg_nentries[SCIF_PPI_APER] = num_aper_chunks; 404 err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO], 405 num_mmio_pages, PCI_DMA_BIDIRECTIONAL); 406 if (err != num_mmio_pages) 407 goto scif_p2p_free; 408 err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER], 409 num_aper_chunks, PCI_DMA_BIDIRECTIONAL); 410 if (err != num_aper_chunks) 411 goto dma_unmap; 412 p2p->ppi_da[SCIF_PPI_MMIO] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_MMIO]); 413 p2p->ppi_da[SCIF_PPI_APER] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_APER]); 414 p2p->ppi_len[SCIF_PPI_MMIO] = num_mmio_pages; 415 p2p->ppi_len[SCIF_PPI_APER] = num_aper_pages; 416 p2p->ppi_peer_id = peerdev->node; 417 return p2p; 418dma_unmap: 419 dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO], 420 p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL); 421scif_p2p_free: 422 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]); 423 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]); 424free_p2p: 425 kfree(p2p); 426 return NULL; 427} 428 429/* Uninitialize and release resources from a p2p mapping */ 430static void scif_deinit_p2p_info(struct scif_dev *scifdev, 431 struct scif_p2p_info *p2p) 432{ 433 struct scif_hw_dev *sdev = scifdev->sdev; 434 435 dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO], 436 p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL); 437 dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER], 438 p2p->sg_nentries[SCIF_PPI_APER], DMA_BIDIRECTIONAL); 439 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]); 440 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]); 441 kfree(p2p); 442} 443 444/** 445 * scif_node_connect: Respond to SCIF_NODE_CONNECT interrupt message 446 * @dst: Destination node 447 * 448 * Connect the src and dst node by setting up the p2p connection 449 * between them. Management node here acts like a proxy. 450 */ 451static void scif_node_connect(struct scif_dev *scifdev, int dst) 452{ 453 struct scif_dev *dev_j = scifdev; 454 struct scif_dev *dev_i = NULL; 455 struct scif_p2p_info *p2p_ij = NULL; /* bus addr for j from i */ 456 struct scif_p2p_info *p2p_ji = NULL; /* bus addr for i from j */ 457 struct scif_p2p_info *p2p; 458 struct list_head *pos, *tmp; 459 struct scifmsg msg; 460 int err; 461 u64 tmppayload; 462 463 if (dst < 1 || dst > scif_info.maxid) 464 return; 465 466 dev_i = &scif_dev[dst]; 467 468 if (!_scifdev_alive(dev_i)) 469 return; 470 /* 471 * If the p2p connection is already setup or in the process of setting 472 * up then just ignore this request. The requested node will get 473 * informed by SCIF_NODE_ADD_ACK or SCIF_NODE_ADD_NACK 474 */ 475 if (!list_empty(&dev_i->p2p)) { 476 list_for_each_safe(pos, tmp, &dev_i->p2p) { 477 p2p = list_entry(pos, struct scif_p2p_info, ppi_list); 478 if (p2p->ppi_peer_id == dev_j->node) 479 return; 480 } 481 } 482 p2p_ij = scif_init_p2p_info(dev_i, dev_j); 483 if (!p2p_ij) 484 return; 485 p2p_ji = scif_init_p2p_info(dev_j, dev_i); 486 if (!p2p_ji) { 487 scif_deinit_p2p_info(dev_i, p2p_ij); 488 return; 489 } 490 list_add_tail(&p2p_ij->ppi_list, &dev_i->p2p); 491 list_add_tail(&p2p_ji->ppi_list, &dev_j->p2p); 492 493 /* 494 * Send a SCIF_NODE_ADD to dev_i, pass it its bus address 495 * as seen from dev_j 496 */ 497 msg.uop = SCIF_NODE_ADD; 498 msg.src.node = dev_j->node; 499 msg.dst.node = dev_i->node; 500 501 msg.payload[0] = p2p_ji->ppi_da[SCIF_PPI_APER]; 502 msg.payload[1] = p2p_ij->ppi_da[SCIF_PPI_MMIO]; 503 msg.payload[2] = p2p_ij->ppi_da[SCIF_PPI_APER]; 504 msg.payload[3] = p2p_ij->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT; 505 506 err = scif_nodeqp_send(dev_i, &msg); 507 if (err) { 508 dev_err(&scifdev->sdev->dev, 509 "%s %d error %d\n", __func__, __LINE__, err); 510 return; 511 } 512 513 /* Same as above but to dev_j */ 514 msg.uop = SCIF_NODE_ADD; 515 msg.src.node = dev_i->node; 516 msg.dst.node = dev_j->node; 517 518 tmppayload = msg.payload[0]; 519 msg.payload[0] = msg.payload[2]; 520 msg.payload[2] = tmppayload; 521 msg.payload[1] = p2p_ji->ppi_da[SCIF_PPI_MMIO]; 522 msg.payload[3] = p2p_ji->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT; 523 524 scif_nodeqp_send(dev_j, &msg); 525} 526 527static void scif_p2p_setup(void) 528{ 529 int i, j; 530 531 if (!scif_info.p2p_enable) 532 return; 533 534 for (i = 1; i <= scif_info.maxid; i++) 535 if (!_scifdev_alive(&scif_dev[i])) 536 return; 537 538 for (i = 1; i <= scif_info.maxid; i++) { 539 for (j = 1; j <= scif_info.maxid; j++) { 540 struct scif_dev *scifdev = &scif_dev[i]; 541 542 if (i == j) 543 continue; 544 scif_node_connect(scifdev, j); 545 } 546 } 547} 548 549static char *message_types[] = {"BAD", 550 "INIT", 551 "EXIT", 552 "SCIF_EXIT_ACK", 553 "SCIF_NODE_ADD", 554 "SCIF_NODE_ADD_ACK", 555 "SCIF_NODE_ADD_NACK", 556 "REMOVE_NODE", 557 "REMOVE_NODE_ACK", 558 "CNCT_REQ", 559 "CNCT_GNT", 560 "CNCT_GNTACK", 561 "CNCT_GNTNACK", 562 "CNCT_REJ", 563 "DISCNCT", 564 "DISCNT_ACK", 565 "CLIENT_SENT", 566 "CLIENT_RCVD", 567 "SCIF_GET_NODE_INFO", 568 "REGISTER", 569 "REGISTER_ACK", 570 "REGISTER_NACK", 571 "UNREGISTER", 572 "UNREGISTER_ACK", 573 "UNREGISTER_NACK", 574 "ALLOC_REQ", 575 "ALLOC_GNT", 576 "ALLOC_REJ", 577 "FREE_PHYS", 578 "FREE_VIRT", 579 "MUNMAP", 580 "MARK", 581 "MARK_ACK", 582 "MARK_NACK", 583 "WAIT", 584 "WAIT_ACK", 585 "WAIT_NACK", 586 "SIGNAL_LOCAL", 587 "SIGNAL_REMOTE", 588 "SIG_ACK", 589 "SIG_NACK"}; 590 591static void 592scif_display_message(struct scif_dev *scifdev, struct scifmsg *msg, 593 const char *label) 594{ 595 if (!scif_info.en_msg_log) 596 return; 597 if (msg->uop > SCIF_MAX_MSG) { 598 dev_err(&scifdev->sdev->dev, 599 "%s: unknown msg type %d\n", label, msg->uop); 600 return; 601 } 602 dev_info(&scifdev->sdev->dev, 603 "%s: msg type %s, src %d:%d, dest %d:%d payload 0x%llx:0x%llx:0x%llx:0x%llx\n", 604 label, message_types[msg->uop], msg->src.node, msg->src.port, 605 msg->dst.node, msg->dst.port, msg->payload[0], msg->payload[1], 606 msg->payload[2], msg->payload[3]); 607} 608 609int _scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg) 610{ 611 struct scif_qp *qp = scifdev->qpairs; 612 int err = -ENOMEM, loop_cnt = 0; 613 614 scif_display_message(scifdev, msg, "Sent"); 615 if (!qp) { 616 err = -EINVAL; 617 goto error; 618 } 619 spin_lock(&qp->send_lock); 620 621 while ((err = scif_rb_write(&qp->outbound_q, 622 msg, sizeof(struct scifmsg)))) { 623 mdelay(1); 624#define SCIF_NODEQP_SEND_TO_MSEC (3 * 1000) 625 if (loop_cnt++ > (SCIF_NODEQP_SEND_TO_MSEC)) { 626 err = -ENODEV; 627 break; 628 } 629 } 630 if (!err) 631 scif_rb_commit(&qp->outbound_q); 632 spin_unlock(&qp->send_lock); 633 if (!err) { 634 if (scifdev_self(scifdev)) 635 /* 636 * For loopback we need to emulate an interrupt by 637 * queuing work for the queue handling real node 638 * Qp interrupts. 639 */ 640 queue_work(scifdev->intr_wq, &scifdev->intr_bh); 641 else 642 scif_send_msg_intr(scifdev); 643 } 644error: 645 if (err) 646 dev_dbg(&scifdev->sdev->dev, 647 "%s %d error %d uop %d\n", 648 __func__, __LINE__, err, msg->uop); 649 return err; 650} 651 652/** 653 * scif_nodeqp_send - Send a message on the node queue pair 654 * @scifdev: Scif Device. 655 * @msg: The message to be sent. 656 */ 657int scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg) 658{ 659 int err; 660 struct device *spdev = NULL; 661 662 if (msg->uop > SCIF_EXIT_ACK) { 663 /* Don't send messages once the exit flow has begun */ 664 if (OP_IDLE != scifdev->exit) 665 return -ENODEV; 666 spdev = scif_get_peer_dev(scifdev); 667 if (IS_ERR(spdev)) { 668 err = PTR_ERR(spdev); 669 return err; 670 } 671 } 672 err = _scif_nodeqp_send(scifdev, msg); 673 if (msg->uop > SCIF_EXIT_ACK) 674 scif_put_peer_dev(spdev); 675 return err; 676} 677 678/* 679 * scif_misc_handler: 680 * 681 * Work queue handler for servicing miscellaneous SCIF tasks. 682 * Examples include: 683 * 1) Remote fence requests. 684 * 2) Destruction of temporary registered windows 685 * created during scif_vreadfrom()/scif_vwriteto(). 686 * 3) Cleanup of zombie endpoints. 687 */ 688void scif_misc_handler(struct work_struct *work) 689{ 690 scif_rma_handle_remote_fences(); 691 scif_rma_destroy_windows(); 692 scif_rma_destroy_tcw_invalid(); 693 scif_cleanup_zombie_epd(); 694} 695 696/** 697 * scif_init() - Respond to SCIF_INIT interrupt message 698 * @scifdev: Remote SCIF device node 699 * @msg: Interrupt message 700 */ 701static __always_inline void 702scif_init(struct scif_dev *scifdev, struct scifmsg *msg) 703{ 704 /* 705 * Allow the thread waiting for device page updates for the peer QP DMA 706 * address to complete initializing the inbound_q. 707 */ 708 flush_delayed_work(&scifdev->qp_dwork); 709 710 scif_peer_register_device(scifdev); 711 712 if (scif_is_mgmt_node()) { 713 mutex_lock(&scif_info.conflock); 714 scif_p2p_setup(); 715 mutex_unlock(&scif_info.conflock); 716 } 717} 718 719/** 720 * scif_exit() - Respond to SCIF_EXIT interrupt message 721 * @scifdev: Remote SCIF device node 722 * @msg: Interrupt message 723 * 724 * This function stops the SCIF interface for the node which sent 725 * the SCIF_EXIT message and starts waiting for that node to 726 * resetup the queue pair again. 727 */ 728static __always_inline void 729scif_exit(struct scif_dev *scifdev, struct scifmsg *unused) 730{ 731 scifdev->exit_ack_pending = true; 732 if (scif_is_mgmt_node()) 733 scif_disconnect_node(scifdev->node, false); 734 else 735 scif_stop(scifdev); 736 schedule_delayed_work(&scifdev->qp_dwork, 737 msecs_to_jiffies(1000)); 738} 739 740/** 741 * scif_exitack() - Respond to SCIF_EXIT_ACK interrupt message 742 * @scifdev: Remote SCIF device node 743 * @msg: Interrupt message 744 * 745 */ 746static __always_inline void 747scif_exit_ack(struct scif_dev *scifdev, struct scifmsg *unused) 748{ 749 scifdev->exit = OP_COMPLETED; 750 wake_up(&scif_info.exitwq); 751} 752 753/** 754 * scif_node_add() - Respond to SCIF_NODE_ADD interrupt message 755 * @scifdev: Remote SCIF device node 756 * @msg: Interrupt message 757 * 758 * When the mgmt node driver has finished initializing a MIC node queue pair it 759 * marks the node as online. It then looks for all currently online MIC cards 760 * and send a SCIF_NODE_ADD message to identify the ID of the new card for 761 * peer to peer initialization 762 * 763 * The local node allocates its incoming queue and sends its address in the 764 * SCIF_NODE_ADD_ACK message back to the mgmt node, the mgmt node "reflects" 765 * this message to the new node 766 */ 767static __always_inline void 768scif_node_add(struct scif_dev *scifdev, struct scifmsg *msg) 769{ 770 struct scif_dev *newdev; 771 dma_addr_t qp_offset; 772 int qp_connect; 773 struct scif_hw_dev *sdev; 774 775 dev_dbg(&scifdev->sdev->dev, 776 "Scifdev %d:%d received NODE_ADD msg for node %d\n", 777 scifdev->node, msg->dst.node, msg->src.node); 778 dev_dbg(&scifdev->sdev->dev, 779 "Remote address for this node's aperture %llx\n", 780 msg->payload[0]); 781 newdev = &scif_dev[msg->src.node]; 782 newdev->node = msg->src.node; 783 newdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev; 784 sdev = newdev->sdev; 785 786 if (scif_setup_intr_wq(newdev)) { 787 dev_err(&scifdev->sdev->dev, 788 "failed to setup interrupts for %d\n", msg->src.node); 789 goto interrupt_setup_error; 790 } 791 newdev->mmio.va = ioremap(msg->payload[1], sdev->mmio->len); 792 if (!newdev->mmio.va) { 793 dev_err(&scifdev->sdev->dev, 794 "failed to map mmio for %d\n", msg->src.node); 795 goto mmio_map_error; 796 } 797 newdev->qpairs = kzalloc(sizeof(*newdev->qpairs), GFP_KERNEL); 798 if (!newdev->qpairs) 799 goto qp_alloc_error; 800 /* 801 * Set the base address of the remote node's memory since it gets 802 * added to qp_offset 803 */ 804 newdev->base_addr = msg->payload[0]; 805 806 qp_connect = scif_setup_qp_connect(newdev->qpairs, &qp_offset, 807 SCIF_NODE_QP_SIZE, newdev); 808 if (qp_connect) { 809 dev_err(&scifdev->sdev->dev, 810 "failed to setup qp_connect %d\n", qp_connect); 811 goto qp_connect_error; 812 } 813 814 newdev->db = sdev->hw_ops->next_db(sdev); 815 newdev->cookie = sdev->hw_ops->request_irq(sdev, scif_intr_handler, 816 "SCIF_INTR", newdev, 817 newdev->db); 818 if (IS_ERR(newdev->cookie)) 819 goto qp_connect_error; 820 newdev->qpairs->magic = SCIFEP_MAGIC; 821 newdev->qpairs->qp_state = SCIF_QP_OFFLINE; 822 823 msg->uop = SCIF_NODE_ADD_ACK; 824 msg->dst.node = msg->src.node; 825 msg->src.node = scif_info.nodeid; 826 msg->payload[0] = qp_offset; 827 msg->payload[2] = newdev->db; 828 scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg); 829 return; 830qp_connect_error: 831 kfree(newdev->qpairs); 832 newdev->qpairs = NULL; 833qp_alloc_error: 834 iounmap(newdev->mmio.va); 835 newdev->mmio.va = NULL; 836mmio_map_error: 837interrupt_setup_error: 838 dev_err(&scifdev->sdev->dev, 839 "node add failed for node %d\n", msg->src.node); 840 msg->uop = SCIF_NODE_ADD_NACK; 841 msg->dst.node = msg->src.node; 842 msg->src.node = scif_info.nodeid; 843 scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg); 844} 845 846void scif_poll_qp_state(struct work_struct *work) 847{ 848#define SCIF_NODE_QP_RETRY 100 849#define SCIF_NODE_QP_TIMEOUT 100 850 struct scif_dev *peerdev = container_of(work, struct scif_dev, 851 p2p_dwork.work); 852 struct scif_qp *qp = &peerdev->qpairs[0]; 853 854 if (qp->qp_state != SCIF_QP_ONLINE || 855 qp->remote_qp->qp_state != SCIF_QP_ONLINE) { 856 if (peerdev->p2p_retry++ == SCIF_NODE_QP_RETRY) { 857 dev_err(&peerdev->sdev->dev, 858 "Warning: QP check timeout with state %d\n", 859 qp->qp_state); 860 goto timeout; 861 } 862 schedule_delayed_work(&peerdev->p2p_dwork, 863 msecs_to_jiffies(SCIF_NODE_QP_TIMEOUT)); 864 return; 865 } 866 return; 867timeout: 868 dev_err(&peerdev->sdev->dev, 869 "%s %d remote node %d offline, state = 0x%x\n", 870 __func__, __LINE__, peerdev->node, qp->qp_state); 871 qp->remote_qp->qp_state = SCIF_QP_OFFLINE; 872 scif_peer_unregister_device(peerdev); 873 scif_cleanup_scifdev(peerdev); 874} 875 876/** 877 * scif_node_add_ack() - Respond to SCIF_NODE_ADD_ACK interrupt message 878 * @scifdev: Remote SCIF device node 879 * @msg: Interrupt message 880 * 881 * After a MIC node receives the SCIF_NODE_ADD_ACK message it send this 882 * message to the mgmt node to confirm the sequence is finished. 883 * 884 */ 885static __always_inline void 886scif_node_add_ack(struct scif_dev *scifdev, struct scifmsg *msg) 887{ 888 struct scif_dev *peerdev; 889 struct scif_qp *qp; 890 struct scif_dev *dst_dev = &scif_dev[msg->dst.node]; 891 892 dev_dbg(&scifdev->sdev->dev, 893 "Scifdev %d received SCIF_NODE_ADD_ACK msg src %d dst %d\n", 894 scifdev->node, msg->src.node, msg->dst.node); 895 dev_dbg(&scifdev->sdev->dev, 896 "payload %llx %llx %llx %llx\n", msg->payload[0], 897 msg->payload[1], msg->payload[2], msg->payload[3]); 898 if (scif_is_mgmt_node()) { 899 /* 900 * the lock serializes with scif_qp_response_ack. The mgmt node 901 * is forwarding the NODE_ADD_ACK message from src to dst we 902 * need to make sure that the dst has already received a 903 * NODE_ADD for src and setup its end of the qp to dst 904 */ 905 mutex_lock(&scif_info.conflock); 906 msg->payload[1] = scif_info.maxid; 907 scif_nodeqp_send(dst_dev, msg); 908 mutex_unlock(&scif_info.conflock); 909 return; 910 } 911 peerdev = &scif_dev[msg->src.node]; 912 peerdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev; 913 peerdev->node = msg->src.node; 914 915 qp = &peerdev->qpairs[0]; 916 917 if ((scif_setup_qp_connect_response(peerdev, &peerdev->qpairs[0], 918 msg->payload[0]))) 919 goto local_error; 920 peerdev->rdb = msg->payload[2]; 921 qp->remote_qp->qp_state = SCIF_QP_ONLINE; 922 923 scif_peer_register_device(peerdev); 924 925 schedule_delayed_work(&peerdev->p2p_dwork, 0); 926 return; 927local_error: 928 scif_cleanup_scifdev(peerdev); 929} 930 931/** 932 * scif_node_add_nack: Respond to SCIF_NODE_ADD_NACK interrupt message 933 * @msg: Interrupt message 934 * 935 * SCIF_NODE_ADD failed, so inform the waiting wq. 936 */ 937static __always_inline void 938scif_node_add_nack(struct scif_dev *scifdev, struct scifmsg *msg) 939{ 940 if (scif_is_mgmt_node()) { 941 struct scif_dev *dst_dev = &scif_dev[msg->dst.node]; 942 943 dev_dbg(&scifdev->sdev->dev, 944 "SCIF_NODE_ADD_NACK received from %d\n", scifdev->node); 945 scif_nodeqp_send(dst_dev, msg); 946 } 947} 948 949/* 950 * scif_node_remove: Handle SCIF_NODE_REMOVE message 951 * @msg: Interrupt message 952 * 953 * Handle node removal. 954 */ 955static __always_inline void 956scif_node_remove(struct scif_dev *scifdev, struct scifmsg *msg) 957{ 958 int node = msg->payload[0]; 959 struct scif_dev *scdev = &scif_dev[node]; 960 961 scdev->node_remove_ack_pending = true; 962 scif_handle_remove_node(node); 963} 964 965/* 966 * scif_node_remove_ack: Handle SCIF_NODE_REMOVE_ACK message 967 * @msg: Interrupt message 968 * 969 * The peer has acked a SCIF_NODE_REMOVE message. 970 */ 971static __always_inline void 972scif_node_remove_ack(struct scif_dev *scifdev, struct scifmsg *msg) 973{ 974 struct scif_dev *sdev = &scif_dev[msg->payload[0]]; 975 976 atomic_inc(&sdev->disconn_rescnt); 977 wake_up(&sdev->disconn_wq); 978} 979 980/** 981 * scif_get_node_info: Respond to SCIF_GET_NODE_INFO interrupt message 982 * @msg: Interrupt message 983 * 984 * Retrieve node info i.e maxid and total from the mgmt node. 985 */ 986static __always_inline void 987scif_get_node_info_resp(struct scif_dev *scifdev, struct scifmsg *msg) 988{ 989 if (scif_is_mgmt_node()) { 990 swap(msg->dst.node, msg->src.node); 991 mutex_lock(&scif_info.conflock); 992 msg->payload[1] = scif_info.maxid; 993 msg->payload[2] = scif_info.total; 994 mutex_unlock(&scif_info.conflock); 995 scif_nodeqp_send(scifdev, msg); 996 } else { 997 struct completion *node_info = 998 (struct completion *)msg->payload[3]; 999 1000 mutex_lock(&scif_info.conflock); 1001 scif_info.maxid = msg->payload[1]; 1002 scif_info.total = msg->payload[2]; 1003 complete_all(node_info); 1004 mutex_unlock(&scif_info.conflock); 1005 } 1006} 1007 1008static void 1009scif_msg_unknown(struct scif_dev *scifdev, struct scifmsg *msg) 1010{ 1011 /* Bogus Node Qp Message? */ 1012 dev_err(&scifdev->sdev->dev, 1013 "Unknown message 0x%xn scifdev->node 0x%x\n", 1014 msg->uop, scifdev->node); 1015} 1016 1017static void (*scif_intr_func[SCIF_MAX_MSG + 1]) 1018 (struct scif_dev *, struct scifmsg *msg) = { 1019 scif_msg_unknown, /* Error */ 1020 scif_init, /* SCIF_INIT */ 1021 scif_exit, /* SCIF_EXIT */ 1022 scif_exit_ack, /* SCIF_EXIT_ACK */ 1023 scif_node_add, /* SCIF_NODE_ADD */ 1024 scif_node_add_ack, /* SCIF_NODE_ADD_ACK */ 1025 scif_node_add_nack, /* SCIF_NODE_ADD_NACK */ 1026 scif_node_remove, /* SCIF_NODE_REMOVE */ 1027 scif_node_remove_ack, /* SCIF_NODE_REMOVE_ACK */ 1028 scif_cnctreq, /* SCIF_CNCT_REQ */ 1029 scif_cnctgnt, /* SCIF_CNCT_GNT */ 1030 scif_cnctgnt_ack, /* SCIF_CNCT_GNTACK */ 1031 scif_cnctgnt_nack, /* SCIF_CNCT_GNTNACK */ 1032 scif_cnctrej, /* SCIF_CNCT_REJ */ 1033 scif_discnct, /* SCIF_DISCNCT */ 1034 scif_discnt_ack, /* SCIF_DISCNT_ACK */ 1035 scif_clientsend, /* SCIF_CLIENT_SENT */ 1036 scif_clientrcvd, /* SCIF_CLIENT_RCVD */ 1037 scif_get_node_info_resp,/* SCIF_GET_NODE_INFO */ 1038 scif_recv_reg, /* SCIF_REGISTER */ 1039 scif_recv_reg_ack, /* SCIF_REGISTER_ACK */ 1040 scif_recv_reg_nack, /* SCIF_REGISTER_NACK */ 1041 scif_recv_unreg, /* SCIF_UNREGISTER */ 1042 scif_recv_unreg_ack, /* SCIF_UNREGISTER_ACK */ 1043 scif_recv_unreg_nack, /* SCIF_UNREGISTER_NACK */ 1044 scif_alloc_req, /* SCIF_ALLOC_REQ */ 1045 scif_alloc_gnt_rej, /* SCIF_ALLOC_GNT */ 1046 scif_alloc_gnt_rej, /* SCIF_ALLOC_REJ */ 1047 scif_free_virt, /* SCIF_FREE_VIRT */ 1048 scif_recv_munmap, /* SCIF_MUNMAP */ 1049 scif_recv_mark, /* SCIF_MARK */ 1050 scif_recv_mark_resp, /* SCIF_MARK_ACK */ 1051 scif_recv_mark_resp, /* SCIF_MARK_NACK */ 1052 scif_recv_wait, /* SCIF_WAIT */ 1053 scif_recv_wait_resp, /* SCIF_WAIT_ACK */ 1054 scif_recv_wait_resp, /* SCIF_WAIT_NACK */ 1055 scif_recv_sig_local, /* SCIF_SIG_LOCAL */ 1056 scif_recv_sig_remote, /* SCIF_SIG_REMOTE */ 1057 scif_recv_sig_resp, /* SCIF_SIG_ACK */ 1058 scif_recv_sig_resp, /* SCIF_SIG_NACK */ 1059}; 1060 1061/** 1062 * scif_nodeqp_msg_handler() - Common handler for node messages 1063 * @scifdev: Remote device to respond to 1064 * @qp: Remote memory pointer 1065 * @msg: The message to be handled. 1066 * 1067 * This routine calls the appropriate routine to handle a Node Qp 1068 * message receipt 1069 */ 1070static int scif_max_msg_id = SCIF_MAX_MSG; 1071 1072static void 1073scif_nodeqp_msg_handler(struct scif_dev *scifdev, 1074 struct scif_qp *qp, struct scifmsg *msg) 1075{ 1076 scif_display_message(scifdev, msg, "Rcvd"); 1077 1078 if (msg->uop > (u32)scif_max_msg_id) { 1079 /* Bogus Node Qp Message? */ 1080 dev_err(&scifdev->sdev->dev, 1081 "Unknown message 0x%xn scifdev->node 0x%x\n", 1082 msg->uop, scifdev->node); 1083 return; 1084 } 1085 1086 scif_intr_func[msg->uop](scifdev, msg); 1087} 1088 1089/** 1090 * scif_nodeqp_intrhandler() - Interrupt handler for node messages 1091 * @scifdev: Remote device to respond to 1092 * @qp: Remote memory pointer 1093 * 1094 * This routine is triggered by the interrupt mechanism. It reads 1095 * messages from the node queue RB and calls the Node QP Message handling 1096 * routine. 1097 */ 1098void scif_nodeqp_intrhandler(struct scif_dev *scifdev, struct scif_qp *qp) 1099{ 1100 struct scifmsg msg; 1101 int read_size; 1102 1103 do { 1104 read_size = scif_rb_get_next(&qp->inbound_q, &msg, sizeof(msg)); 1105 if (!read_size) 1106 break; 1107 scif_nodeqp_msg_handler(scifdev, qp, &msg); 1108 /* 1109 * The node queue pair is unmapped so skip the read pointer 1110 * update after receipt of a SCIF_EXIT_ACK 1111 */ 1112 if (SCIF_EXIT_ACK == msg.uop) 1113 break; 1114 scif_rb_update_read_ptr(&qp->inbound_q); 1115 } while (1); 1116} 1117 1118/** 1119 * scif_loopb_wq_handler - Loopback Workqueue Handler. 1120 * @work: loop back work 1121 * 1122 * This work queue routine is invoked by the loopback work queue handler. 1123 * It grabs the recv lock, dequeues any available messages from the head 1124 * of the loopback message list, calls the node QP message handler, 1125 * waits for it to return, then frees up this message and dequeues more 1126 * elements of the list if available. 1127 */ 1128static void scif_loopb_wq_handler(struct work_struct *unused) 1129{ 1130 struct scif_dev *scifdev = scif_info.loopb_dev; 1131 struct scif_qp *qp = scifdev->qpairs; 1132 struct scif_loopb_msg *msg; 1133 1134 do { 1135 msg = NULL; 1136 spin_lock(&qp->recv_lock); 1137 if (!list_empty(&scif_info.loopb_recv_q)) { 1138 msg = list_first_entry(&scif_info.loopb_recv_q, 1139 struct scif_loopb_msg, 1140 list); 1141 list_del(&msg->list); 1142 } 1143 spin_unlock(&qp->recv_lock); 1144 1145 if (msg) { 1146 scif_nodeqp_msg_handler(scifdev, qp, &msg->msg); 1147 kfree(msg); 1148 } 1149 } while (msg); 1150} 1151 1152/** 1153 * scif_loopb_msg_handler() - Workqueue handler for loopback messages. 1154 * @scifdev: SCIF device 1155 * @qp: Queue pair. 1156 * 1157 * This work queue routine is triggered when a loopback message is received. 1158 * 1159 * We need special handling for receiving Node Qp messages on a loopback SCIF 1160 * device via two workqueues for receiving messages. 1161 * 1162 * The reason we need the extra workqueue which is not required with *normal* 1163 * non-loopback SCIF devices is the potential classic deadlock described below: 1164 * 1165 * Thread A tries to send a message on a loopback SCIF device and blocks since 1166 * there is no space in the RB while it has the send_lock held or another 1167 * lock called lock X for example. 1168 * 1169 * Thread B: The Loopback Node QP message receive workqueue receives the message 1170 * and tries to send a message (eg an ACK) to the loopback SCIF device. It tries 1171 * to grab the send lock again or lock X and deadlocks with Thread A. The RB 1172 * cannot be drained any further due to this classic deadlock. 1173 * 1174 * In order to avoid deadlocks as mentioned above we have an extra level of 1175 * indirection achieved by having two workqueues. 1176 * 1) The first workqueue whose handler is scif_loopb_msg_handler reads 1177 * messages from the Node QP RB, adds them to a list and queues work for the 1178 * second workqueue. 1179 * 1180 * 2) The second workqueue whose handler is scif_loopb_wq_handler dequeues 1181 * messages from the list, handles them, frees up the memory and dequeues 1182 * more elements from the list if possible. 1183 */ 1184int 1185scif_loopb_msg_handler(struct scif_dev *scifdev, struct scif_qp *qp) 1186{ 1187 int read_size; 1188 struct scif_loopb_msg *msg; 1189 1190 do { 1191 msg = kmalloc(sizeof(*msg), GFP_KERNEL); 1192 if (!msg) 1193 return -ENOMEM; 1194 read_size = scif_rb_get_next(&qp->inbound_q, &msg->msg, 1195 sizeof(struct scifmsg)); 1196 if (read_size != sizeof(struct scifmsg)) { 1197 kfree(msg); 1198 scif_rb_update_read_ptr(&qp->inbound_q); 1199 break; 1200 } 1201 spin_lock(&qp->recv_lock); 1202 list_add_tail(&msg->list, &scif_info.loopb_recv_q); 1203 spin_unlock(&qp->recv_lock); 1204 queue_work(scif_info.loopb_wq, &scif_info.loopb_work); 1205 scif_rb_update_read_ptr(&qp->inbound_q); 1206 } while (read_size == sizeof(struct scifmsg)); 1207 return read_size; 1208} 1209 1210/** 1211 * scif_setup_loopback_qp - One time setup work for Loopback Node Qp. 1212 * @scifdev: SCIF device 1213 * 1214 * Sets up the required loopback workqueues, queue pairs and ring buffers 1215 */ 1216int scif_setup_loopback_qp(struct scif_dev *scifdev) 1217{ 1218 int err = 0; 1219 void *local_q; 1220 struct scif_qp *qp; 1221 1222 err = scif_setup_intr_wq(scifdev); 1223 if (err) 1224 goto exit; 1225 INIT_LIST_HEAD(&scif_info.loopb_recv_q); 1226 snprintf(scif_info.loopb_wqname, sizeof(scif_info.loopb_wqname), 1227 "SCIF LOOPB %d", scifdev->node); 1228 scif_info.loopb_wq = 1229 alloc_ordered_workqueue(scif_info.loopb_wqname, 0); 1230 if (!scif_info.loopb_wq) { 1231 err = -ENOMEM; 1232 goto destroy_intr; 1233 } 1234 INIT_WORK(&scif_info.loopb_work, scif_loopb_wq_handler); 1235 /* Allocate Self Qpair */ 1236 scifdev->qpairs = kzalloc(sizeof(*scifdev->qpairs), GFP_KERNEL); 1237 if (!scifdev->qpairs) { 1238 err = -ENOMEM; 1239 goto destroy_loopb_wq; 1240 } 1241 1242 qp = scifdev->qpairs; 1243 qp->magic = SCIFEP_MAGIC; 1244 spin_lock_init(&qp->send_lock); 1245 spin_lock_init(&qp->recv_lock); 1246 1247 local_q = kzalloc(SCIF_NODE_QP_SIZE, GFP_KERNEL); 1248 if (!local_q) { 1249 err = -ENOMEM; 1250 goto free_qpairs; 1251 } 1252 /* 1253 * For loopback the inbound_q and outbound_q are essentially the same 1254 * since the Node sends a message on the loopback interface to the 1255 * outbound_q which is then received on the inbound_q. 1256 */ 1257 scif_rb_init(&qp->outbound_q, 1258 &qp->local_read, 1259 &qp->local_write, 1260 local_q, get_count_order(SCIF_NODE_QP_SIZE)); 1261 1262 scif_rb_init(&qp->inbound_q, 1263 &qp->local_read, 1264 &qp->local_write, 1265 local_q, get_count_order(SCIF_NODE_QP_SIZE)); 1266 scif_info.nodeid = scifdev->node; 1267 1268 scif_peer_register_device(scifdev); 1269 1270 scif_info.loopb_dev = scifdev; 1271 return err; 1272free_qpairs: 1273 kfree(scifdev->qpairs); 1274destroy_loopb_wq: 1275 destroy_workqueue(scif_info.loopb_wq); 1276destroy_intr: 1277 scif_destroy_intr_wq(scifdev); 1278exit: 1279 return err; 1280} 1281 1282/** 1283 * scif_destroy_loopback_qp - One time uninit work for Loopback Node Qp 1284 * @scifdev: SCIF device 1285 * 1286 * Destroys the workqueues and frees up the Ring Buffer and Queue Pair memory. 1287 */ 1288int scif_destroy_loopback_qp(struct scif_dev *scifdev) 1289{ 1290 scif_peer_unregister_device(scifdev); 1291 destroy_workqueue(scif_info.loopb_wq); 1292 scif_destroy_intr_wq(scifdev); 1293 kfree(scifdev->qpairs->outbound_q.rb_base); 1294 kfree(scifdev->qpairs); 1295 scifdev->sdev = NULL; 1296 scif_info.loopb_dev = NULL; 1297 return 0; 1298} 1299 1300void scif_destroy_p2p(struct scif_dev *scifdev) 1301{ 1302 struct scif_dev *peer_dev; 1303 struct scif_p2p_info *p2p; 1304 struct list_head *pos, *tmp; 1305 int bd; 1306 1307 mutex_lock(&scif_info.conflock); 1308 /* Free P2P mappings in the given node for all its peer nodes */ 1309 list_for_each_safe(pos, tmp, &scifdev->p2p) { 1310 p2p = list_entry(pos, struct scif_p2p_info, ppi_list); 1311 dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO], 1312 p2p->sg_nentries[SCIF_PPI_MMIO], 1313 DMA_BIDIRECTIONAL); 1314 dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_APER], 1315 p2p->sg_nentries[SCIF_PPI_APER], 1316 DMA_BIDIRECTIONAL); 1317 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]); 1318 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]); 1319 list_del(pos); 1320 kfree(p2p); 1321 } 1322 1323 /* Free P2P mapping created in the peer nodes for the given node */ 1324 for (bd = SCIF_MGMT_NODE + 1; bd <= scif_info.maxid; bd++) { 1325 peer_dev = &scif_dev[bd]; 1326 list_for_each_safe(pos, tmp, &peer_dev->p2p) { 1327 p2p = list_entry(pos, struct scif_p2p_info, ppi_list); 1328 if (p2p->ppi_peer_id == scifdev->node) { 1329 dma_unmap_sg(&peer_dev->sdev->dev, 1330 p2p->ppi_sg[SCIF_PPI_MMIO], 1331 p2p->sg_nentries[SCIF_PPI_MMIO], 1332 DMA_BIDIRECTIONAL); 1333 dma_unmap_sg(&peer_dev->sdev->dev, 1334 p2p->ppi_sg[SCIF_PPI_APER], 1335 p2p->sg_nentries[SCIF_PPI_APER], 1336 DMA_BIDIRECTIONAL); 1337 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]); 1338 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]); 1339 list_del(pos); 1340 kfree(p2p); 1341 } 1342 } 1343 } 1344 mutex_unlock(&scif_info.conflock); 1345}