Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v2.6.34 2284 lines 59 kB view raw
1#include "ceph_debug.h" 2 3#include <linux/crc32c.h> 4#include <linux/ctype.h> 5#include <linux/highmem.h> 6#include <linux/inet.h> 7#include <linux/kthread.h> 8#include <linux/net.h> 9#include <linux/slab.h> 10#include <linux/socket.h> 11#include <linux/string.h> 12#include <net/tcp.h> 13 14#include "super.h" 15#include "messenger.h" 16#include "decode.h" 17#include "pagelist.h" 18 19/* 20 * Ceph uses the messenger to exchange ceph_msg messages with other 21 * hosts in the system. The messenger provides ordered and reliable 22 * delivery. We tolerate TCP disconnects by reconnecting (with 23 * exponential backoff) in the case of a fault (disconnection, bad 24 * crc, protocol error). Acks allow sent messages to be discarded by 25 * the sender. 26 */ 27 28/* static tag bytes (protocol control messages) */ 29static char tag_msg = CEPH_MSGR_TAG_MSG; 30static char tag_ack = CEPH_MSGR_TAG_ACK; 31static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; 32 33#ifdef CONFIG_LOCKDEP 34static struct lock_class_key socket_class; 35#endif 36 37 38static void queue_con(struct ceph_connection *con); 39static void con_work(struct work_struct *); 40static void ceph_fault(struct ceph_connection *con); 41 42const char *ceph_name_type_str(int t) 43{ 44 switch (t) { 45 case CEPH_ENTITY_TYPE_MON: return "mon"; 46 case CEPH_ENTITY_TYPE_MDS: return "mds"; 47 case CEPH_ENTITY_TYPE_OSD: return "osd"; 48 case CEPH_ENTITY_TYPE_CLIENT: return "client"; 49 case CEPH_ENTITY_TYPE_ADMIN: return "admin"; 50 default: return "???"; 51 } 52} 53 54/* 55 * nicely render a sockaddr as a string. 56 */ 57#define MAX_ADDR_STR 20 58static char addr_str[MAX_ADDR_STR][40]; 59static DEFINE_SPINLOCK(addr_str_lock); 60static int last_addr_str; 61 62const char *pr_addr(const struct sockaddr_storage *ss) 63{ 64 int i; 65 char *s; 66 struct sockaddr_in *in4 = (void *)ss; 67 unsigned char *quad = (void *)&in4->sin_addr.s_addr; 68 struct sockaddr_in6 *in6 = (void *)ss; 69 70 spin_lock(&addr_str_lock); 71 i = last_addr_str++; 72 if (last_addr_str == MAX_ADDR_STR) 73 last_addr_str = 0; 74 spin_unlock(&addr_str_lock); 75 s = addr_str[i]; 76 77 switch (ss->ss_family) { 78 case AF_INET: 79 sprintf(s, "%u.%u.%u.%u:%u", 80 (unsigned int)quad[0], 81 (unsigned int)quad[1], 82 (unsigned int)quad[2], 83 (unsigned int)quad[3], 84 (unsigned int)ntohs(in4->sin_port)); 85 break; 86 87 case AF_INET6: 88 sprintf(s, "%04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%u", 89 in6->sin6_addr.s6_addr16[0], 90 in6->sin6_addr.s6_addr16[1], 91 in6->sin6_addr.s6_addr16[2], 92 in6->sin6_addr.s6_addr16[3], 93 in6->sin6_addr.s6_addr16[4], 94 in6->sin6_addr.s6_addr16[5], 95 in6->sin6_addr.s6_addr16[6], 96 in6->sin6_addr.s6_addr16[7], 97 (unsigned int)ntohs(in6->sin6_port)); 98 break; 99 100 default: 101 sprintf(s, "(unknown sockaddr family %d)", (int)ss->ss_family); 102 } 103 104 return s; 105} 106 107static void encode_my_addr(struct ceph_messenger *msgr) 108{ 109 memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr)); 110 ceph_encode_addr(&msgr->my_enc_addr); 111} 112 113/* 114 * work queue for all reading and writing to/from the socket. 115 */ 116struct workqueue_struct *ceph_msgr_wq; 117 118int __init ceph_msgr_init(void) 119{ 120 ceph_msgr_wq = create_workqueue("ceph-msgr"); 121 if (IS_ERR(ceph_msgr_wq)) { 122 int ret = PTR_ERR(ceph_msgr_wq); 123 pr_err("msgr_init failed to create workqueue: %d\n", ret); 124 ceph_msgr_wq = NULL; 125 return ret; 126 } 127 return 0; 128} 129 130void ceph_msgr_exit(void) 131{ 132 destroy_workqueue(ceph_msgr_wq); 133} 134 135/* 136 * socket callback functions 137 */ 138 139/* data available on socket, or listen socket received a connect */ 140static void ceph_data_ready(struct sock *sk, int count_unused) 141{ 142 struct ceph_connection *con = 143 (struct ceph_connection *)sk->sk_user_data; 144 if (sk->sk_state != TCP_CLOSE_WAIT) { 145 dout("ceph_data_ready on %p state = %lu, queueing work\n", 146 con, con->state); 147 queue_con(con); 148 } 149} 150 151/* socket has buffer space for writing */ 152static void ceph_write_space(struct sock *sk) 153{ 154 struct ceph_connection *con = 155 (struct ceph_connection *)sk->sk_user_data; 156 157 /* only queue to workqueue if there is data we want to write. */ 158 if (test_bit(WRITE_PENDING, &con->state)) { 159 dout("ceph_write_space %p queueing write work\n", con); 160 queue_con(con); 161 } else { 162 dout("ceph_write_space %p nothing to write\n", con); 163 } 164 165 /* since we have our own write_space, clear the SOCK_NOSPACE flag */ 166 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 167} 168 169/* socket's state has changed */ 170static void ceph_state_change(struct sock *sk) 171{ 172 struct ceph_connection *con = 173 (struct ceph_connection *)sk->sk_user_data; 174 175 dout("ceph_state_change %p state = %lu sk_state = %u\n", 176 con, con->state, sk->sk_state); 177 178 if (test_bit(CLOSED, &con->state)) 179 return; 180 181 switch (sk->sk_state) { 182 case TCP_CLOSE: 183 dout("ceph_state_change TCP_CLOSE\n"); 184 case TCP_CLOSE_WAIT: 185 dout("ceph_state_change TCP_CLOSE_WAIT\n"); 186 if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { 187 if (test_bit(CONNECTING, &con->state)) 188 con->error_msg = "connection failed"; 189 else 190 con->error_msg = "socket closed"; 191 queue_con(con); 192 } 193 break; 194 case TCP_ESTABLISHED: 195 dout("ceph_state_change TCP_ESTABLISHED\n"); 196 queue_con(con); 197 break; 198 } 199} 200 201/* 202 * set up socket callbacks 203 */ 204static void set_sock_callbacks(struct socket *sock, 205 struct ceph_connection *con) 206{ 207 struct sock *sk = sock->sk; 208 sk->sk_user_data = (void *)con; 209 sk->sk_data_ready = ceph_data_ready; 210 sk->sk_write_space = ceph_write_space; 211 sk->sk_state_change = ceph_state_change; 212} 213 214 215/* 216 * socket helpers 217 */ 218 219/* 220 * initiate connection to a remote socket. 221 */ 222static struct socket *ceph_tcp_connect(struct ceph_connection *con) 223{ 224 struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.in_addr; 225 struct socket *sock; 226 int ret; 227 228 BUG_ON(con->sock); 229 ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); 230 if (ret) 231 return ERR_PTR(ret); 232 con->sock = sock; 233 sock->sk->sk_allocation = GFP_NOFS; 234 235#ifdef CONFIG_LOCKDEP 236 lockdep_set_class(&sock->sk->sk_lock, &socket_class); 237#endif 238 239 set_sock_callbacks(sock, con); 240 241 dout("connect %s\n", pr_addr(&con->peer_addr.in_addr)); 242 243 ret = sock->ops->connect(sock, paddr, sizeof(*paddr), O_NONBLOCK); 244 if (ret == -EINPROGRESS) { 245 dout("connect %s EINPROGRESS sk_state = %u\n", 246 pr_addr(&con->peer_addr.in_addr), 247 sock->sk->sk_state); 248 ret = 0; 249 } 250 if (ret < 0) { 251 pr_err("connect %s error %d\n", 252 pr_addr(&con->peer_addr.in_addr), ret); 253 sock_release(sock); 254 con->sock = NULL; 255 con->error_msg = "connect error"; 256 } 257 258 if (ret < 0) 259 return ERR_PTR(ret); 260 return sock; 261} 262 263static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) 264{ 265 struct kvec iov = {buf, len}; 266 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 267 268 return kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); 269} 270 271/* 272 * write something. @more is true if caller will be sending more data 273 * shortly. 274 */ 275static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, 276 size_t kvlen, size_t len, int more) 277{ 278 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 279 280 if (more) 281 msg.msg_flags |= MSG_MORE; 282 else 283 msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ 284 285 return kernel_sendmsg(sock, &msg, iov, kvlen, len); 286} 287 288 289/* 290 * Shutdown/close the socket for the given connection. 291 */ 292static int con_close_socket(struct ceph_connection *con) 293{ 294 int rc; 295 296 dout("con_close_socket on %p sock %p\n", con, con->sock); 297 if (!con->sock) 298 return 0; 299 set_bit(SOCK_CLOSED, &con->state); 300 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); 301 sock_release(con->sock); 302 con->sock = NULL; 303 clear_bit(SOCK_CLOSED, &con->state); 304 return rc; 305} 306 307/* 308 * Reset a connection. Discard all incoming and outgoing messages 309 * and clear *_seq state. 310 */ 311static void ceph_msg_remove(struct ceph_msg *msg) 312{ 313 list_del_init(&msg->list_head); 314 ceph_msg_put(msg); 315} 316static void ceph_msg_remove_list(struct list_head *head) 317{ 318 while (!list_empty(head)) { 319 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, 320 list_head); 321 ceph_msg_remove(msg); 322 } 323} 324 325static void reset_connection(struct ceph_connection *con) 326{ 327 /* reset connection, out_queue, msg_ and connect_seq */ 328 /* discard existing out_queue and msg_seq */ 329 ceph_msg_remove_list(&con->out_queue); 330 ceph_msg_remove_list(&con->out_sent); 331 332 if (con->in_msg) { 333 ceph_msg_put(con->in_msg); 334 con->in_msg = NULL; 335 } 336 337 con->connect_seq = 0; 338 con->out_seq = 0; 339 if (con->out_msg) { 340 ceph_msg_put(con->out_msg); 341 con->out_msg = NULL; 342 } 343 con->in_seq = 0; 344 con->in_seq_acked = 0; 345} 346 347/* 348 * mark a peer down. drop any open connections. 349 */ 350void ceph_con_close(struct ceph_connection *con) 351{ 352 dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr)); 353 set_bit(CLOSED, &con->state); /* in case there's queued work */ 354 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ 355 clear_bit(LOSSYTX, &con->state); /* so we retry next connect */ 356 clear_bit(KEEPALIVE_PENDING, &con->state); 357 clear_bit(WRITE_PENDING, &con->state); 358 mutex_lock(&con->mutex); 359 reset_connection(con); 360 cancel_delayed_work(&con->work); 361 mutex_unlock(&con->mutex); 362 queue_con(con); 363} 364 365/* 366 * Reopen a closed connection, with a new peer address. 367 */ 368void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) 369{ 370 dout("con_open %p %s\n", con, pr_addr(&addr->in_addr)); 371 set_bit(OPENING, &con->state); 372 clear_bit(CLOSED, &con->state); 373 memcpy(&con->peer_addr, addr, sizeof(*addr)); 374 con->delay = 0; /* reset backoff memory */ 375 queue_con(con); 376} 377 378/* 379 * return true if this connection ever successfully opened 380 */ 381bool ceph_con_opened(struct ceph_connection *con) 382{ 383 return con->connect_seq > 0; 384} 385 386/* 387 * generic get/put 388 */ 389struct ceph_connection *ceph_con_get(struct ceph_connection *con) 390{ 391 dout("con_get %p nref = %d -> %d\n", con, 392 atomic_read(&con->nref), atomic_read(&con->nref) + 1); 393 if (atomic_inc_not_zero(&con->nref)) 394 return con; 395 return NULL; 396} 397 398void ceph_con_put(struct ceph_connection *con) 399{ 400 dout("con_put %p nref = %d -> %d\n", con, 401 atomic_read(&con->nref), atomic_read(&con->nref) - 1); 402 BUG_ON(atomic_read(&con->nref) == 0); 403 if (atomic_dec_and_test(&con->nref)) { 404 BUG_ON(con->sock); 405 kfree(con); 406 } 407} 408 409/* 410 * initialize a new connection. 411 */ 412void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) 413{ 414 dout("con_init %p\n", con); 415 memset(con, 0, sizeof(*con)); 416 atomic_set(&con->nref, 1); 417 con->msgr = msgr; 418 mutex_init(&con->mutex); 419 INIT_LIST_HEAD(&con->out_queue); 420 INIT_LIST_HEAD(&con->out_sent); 421 INIT_DELAYED_WORK(&con->work, con_work); 422} 423 424 425/* 426 * We maintain a global counter to order connection attempts. Get 427 * a unique seq greater than @gt. 428 */ 429static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) 430{ 431 u32 ret; 432 433 spin_lock(&msgr->global_seq_lock); 434 if (msgr->global_seq < gt) 435 msgr->global_seq = gt; 436 ret = ++msgr->global_seq; 437 spin_unlock(&msgr->global_seq_lock); 438 return ret; 439} 440 441 442/* 443 * Prepare footer for currently outgoing message, and finish things 444 * off. Assumes out_kvec* are already valid.. we just add on to the end. 445 */ 446static void prepare_write_message_footer(struct ceph_connection *con, int v) 447{ 448 struct ceph_msg *m = con->out_msg; 449 450 dout("prepare_write_message_footer %p\n", con); 451 con->out_kvec_is_msg = true; 452 con->out_kvec[v].iov_base = &m->footer; 453 con->out_kvec[v].iov_len = sizeof(m->footer); 454 con->out_kvec_bytes += sizeof(m->footer); 455 con->out_kvec_left++; 456 con->out_more = m->more_to_follow; 457 con->out_msg_done = true; 458} 459 460/* 461 * Prepare headers for the next outgoing message. 462 */ 463static void prepare_write_message(struct ceph_connection *con) 464{ 465 struct ceph_msg *m; 466 int v = 0; 467 468 con->out_kvec_bytes = 0; 469 con->out_kvec_is_msg = true; 470 con->out_msg_done = false; 471 472 /* Sneak an ack in there first? If we can get it into the same 473 * TCP packet that's a good thing. */ 474 if (con->in_seq > con->in_seq_acked) { 475 con->in_seq_acked = con->in_seq; 476 con->out_kvec[v].iov_base = &tag_ack; 477 con->out_kvec[v++].iov_len = 1; 478 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 479 con->out_kvec[v].iov_base = &con->out_temp_ack; 480 con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack); 481 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack); 482 } 483 484 m = list_first_entry(&con->out_queue, 485 struct ceph_msg, list_head); 486 con->out_msg = m; 487 if (test_bit(LOSSYTX, &con->state)) { 488 list_del_init(&m->list_head); 489 } else { 490 /* put message on sent list */ 491 ceph_msg_get(m); 492 list_move_tail(&m->list_head, &con->out_sent); 493 } 494 495 /* 496 * only assign outgoing seq # if we haven't sent this message 497 * yet. if it is requeued, resend with it's original seq. 498 */ 499 if (m->needs_out_seq) { 500 m->hdr.seq = cpu_to_le64(++con->out_seq); 501 m->needs_out_seq = false; 502 } 503 504 dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", 505 m, con->out_seq, le16_to_cpu(m->hdr.type), 506 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 507 le32_to_cpu(m->hdr.data_len), 508 m->nr_pages); 509 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 510 511 /* tag + hdr + front + middle */ 512 con->out_kvec[v].iov_base = &tag_msg; 513 con->out_kvec[v++].iov_len = 1; 514 con->out_kvec[v].iov_base = &m->hdr; 515 con->out_kvec[v++].iov_len = sizeof(m->hdr); 516 con->out_kvec[v++] = m->front; 517 if (m->middle) 518 con->out_kvec[v++] = m->middle->vec; 519 con->out_kvec_left = v; 520 con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len + 521 (m->middle ? m->middle->vec.iov_len : 0); 522 con->out_kvec_cur = con->out_kvec; 523 524 /* fill in crc (except data pages), footer */ 525 con->out_msg->hdr.crc = 526 cpu_to_le32(crc32c(0, (void *)&m->hdr, 527 sizeof(m->hdr) - sizeof(m->hdr.crc))); 528 con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; 529 con->out_msg->footer.front_crc = 530 cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len)); 531 if (m->middle) 532 con->out_msg->footer.middle_crc = 533 cpu_to_le32(crc32c(0, m->middle->vec.iov_base, 534 m->middle->vec.iov_len)); 535 else 536 con->out_msg->footer.middle_crc = 0; 537 con->out_msg->footer.data_crc = 0; 538 dout("prepare_write_message front_crc %u data_crc %u\n", 539 le32_to_cpu(con->out_msg->footer.front_crc), 540 le32_to_cpu(con->out_msg->footer.middle_crc)); 541 542 /* is there a data payload? */ 543 if (le32_to_cpu(m->hdr.data_len) > 0) { 544 /* initialize page iterator */ 545 con->out_msg_pos.page = 0; 546 con->out_msg_pos.page_pos = 547 le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK; 548 con->out_msg_pos.data_pos = 0; 549 con->out_msg_pos.did_page_crc = 0; 550 con->out_more = 1; /* data + footer will follow */ 551 } else { 552 /* no, queue up footer too and be done */ 553 prepare_write_message_footer(con, v); 554 } 555 556 set_bit(WRITE_PENDING, &con->state); 557} 558 559/* 560 * Prepare an ack. 561 */ 562static void prepare_write_ack(struct ceph_connection *con) 563{ 564 dout("prepare_write_ack %p %llu -> %llu\n", con, 565 con->in_seq_acked, con->in_seq); 566 con->in_seq_acked = con->in_seq; 567 568 con->out_kvec[0].iov_base = &tag_ack; 569 con->out_kvec[0].iov_len = 1; 570 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 571 con->out_kvec[1].iov_base = &con->out_temp_ack; 572 con->out_kvec[1].iov_len = sizeof(con->out_temp_ack); 573 con->out_kvec_left = 2; 574 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack); 575 con->out_kvec_cur = con->out_kvec; 576 con->out_more = 1; /* more will follow.. eventually.. */ 577 set_bit(WRITE_PENDING, &con->state); 578} 579 580/* 581 * Prepare to write keepalive byte. 582 */ 583static void prepare_write_keepalive(struct ceph_connection *con) 584{ 585 dout("prepare_write_keepalive %p\n", con); 586 con->out_kvec[0].iov_base = &tag_keepalive; 587 con->out_kvec[0].iov_len = 1; 588 con->out_kvec_left = 1; 589 con->out_kvec_bytes = 1; 590 con->out_kvec_cur = con->out_kvec; 591 set_bit(WRITE_PENDING, &con->state); 592} 593 594/* 595 * Connection negotiation. 596 */ 597 598static void prepare_connect_authorizer(struct ceph_connection *con) 599{ 600 void *auth_buf; 601 int auth_len = 0; 602 int auth_protocol = 0; 603 604 mutex_unlock(&con->mutex); 605 if (con->ops->get_authorizer) 606 con->ops->get_authorizer(con, &auth_buf, &auth_len, 607 &auth_protocol, &con->auth_reply_buf, 608 &con->auth_reply_buf_len, 609 con->auth_retry); 610 mutex_lock(&con->mutex); 611 612 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); 613 con->out_connect.authorizer_len = cpu_to_le32(auth_len); 614 615 con->out_kvec[con->out_kvec_left].iov_base = auth_buf; 616 con->out_kvec[con->out_kvec_left].iov_len = auth_len; 617 con->out_kvec_left++; 618 con->out_kvec_bytes += auth_len; 619} 620 621/* 622 * We connected to a peer and are saying hello. 623 */ 624static void prepare_write_banner(struct ceph_messenger *msgr, 625 struct ceph_connection *con) 626{ 627 int len = strlen(CEPH_BANNER); 628 629 con->out_kvec[0].iov_base = CEPH_BANNER; 630 con->out_kvec[0].iov_len = len; 631 con->out_kvec[1].iov_base = &msgr->my_enc_addr; 632 con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr); 633 con->out_kvec_left = 2; 634 con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr); 635 con->out_kvec_cur = con->out_kvec; 636 con->out_more = 0; 637 set_bit(WRITE_PENDING, &con->state); 638} 639 640static void prepare_write_connect(struct ceph_messenger *msgr, 641 struct ceph_connection *con, 642 int after_banner) 643{ 644 unsigned global_seq = get_global_seq(con->msgr, 0); 645 int proto; 646 647 switch (con->peer_name.type) { 648 case CEPH_ENTITY_TYPE_MON: 649 proto = CEPH_MONC_PROTOCOL; 650 break; 651 case CEPH_ENTITY_TYPE_OSD: 652 proto = CEPH_OSDC_PROTOCOL; 653 break; 654 case CEPH_ENTITY_TYPE_MDS: 655 proto = CEPH_MDSC_PROTOCOL; 656 break; 657 default: 658 BUG(); 659 } 660 661 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 662 con->connect_seq, global_seq, proto); 663 664 con->out_connect.features = CEPH_FEATURE_SUPPORTED; 665 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 666 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 667 con->out_connect.global_seq = cpu_to_le32(global_seq); 668 con->out_connect.protocol_version = cpu_to_le32(proto); 669 con->out_connect.flags = 0; 670 671 if (!after_banner) { 672 con->out_kvec_left = 0; 673 con->out_kvec_bytes = 0; 674 } 675 con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect; 676 con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect); 677 con->out_kvec_left++; 678 con->out_kvec_bytes += sizeof(con->out_connect); 679 con->out_kvec_cur = con->out_kvec; 680 con->out_more = 0; 681 set_bit(WRITE_PENDING, &con->state); 682 683 prepare_connect_authorizer(con); 684} 685 686 687/* 688 * write as much of pending kvecs to the socket as we can. 689 * 1 -> done 690 * 0 -> socket full, but more to do 691 * <0 -> error 692 */ 693static int write_partial_kvec(struct ceph_connection *con) 694{ 695 int ret; 696 697 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); 698 while (con->out_kvec_bytes > 0) { 699 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, 700 con->out_kvec_left, con->out_kvec_bytes, 701 con->out_more); 702 if (ret <= 0) 703 goto out; 704 con->out_kvec_bytes -= ret; 705 if (con->out_kvec_bytes == 0) 706 break; /* done */ 707 while (ret > 0) { 708 if (ret >= con->out_kvec_cur->iov_len) { 709 ret -= con->out_kvec_cur->iov_len; 710 con->out_kvec_cur++; 711 con->out_kvec_left--; 712 } else { 713 con->out_kvec_cur->iov_len -= ret; 714 con->out_kvec_cur->iov_base += ret; 715 ret = 0; 716 break; 717 } 718 } 719 } 720 con->out_kvec_left = 0; 721 con->out_kvec_is_msg = false; 722 ret = 1; 723out: 724 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, 725 con->out_kvec_bytes, con->out_kvec_left, ret); 726 return ret; /* done! */ 727} 728 729/* 730 * Write as much message data payload as we can. If we finish, queue 731 * up the footer. 732 * 1 -> done, footer is now queued in out_kvec[]. 733 * 0 -> socket full, but more to do 734 * <0 -> error 735 */ 736static int write_partial_msg_pages(struct ceph_connection *con) 737{ 738 struct ceph_msg *msg = con->out_msg; 739 unsigned data_len = le32_to_cpu(msg->hdr.data_len); 740 size_t len; 741 int crc = con->msgr->nocrc; 742 int ret; 743 744 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", 745 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, 746 con->out_msg_pos.page_pos); 747 748 while (con->out_msg_pos.page < con->out_msg->nr_pages) { 749 struct page *page = NULL; 750 void *kaddr = NULL; 751 752 /* 753 * if we are calculating the data crc (the default), we need 754 * to map the page. if our pages[] has been revoked, use the 755 * zero page. 756 */ 757 if (msg->pages) { 758 page = msg->pages[con->out_msg_pos.page]; 759 if (crc) 760 kaddr = kmap(page); 761 } else if (msg->pagelist) { 762 page = list_first_entry(&msg->pagelist->head, 763 struct page, lru); 764 if (crc) 765 kaddr = kmap(page); 766 } else { 767 page = con->msgr->zero_page; 768 if (crc) 769 kaddr = page_address(con->msgr->zero_page); 770 } 771 len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), 772 (int)(data_len - con->out_msg_pos.data_pos)); 773 if (crc && !con->out_msg_pos.did_page_crc) { 774 void *base = kaddr + con->out_msg_pos.page_pos; 775 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); 776 777 BUG_ON(kaddr == NULL); 778 con->out_msg->footer.data_crc = 779 cpu_to_le32(crc32c(tmpcrc, base, len)); 780 con->out_msg_pos.did_page_crc = 1; 781 } 782 783 ret = kernel_sendpage(con->sock, page, 784 con->out_msg_pos.page_pos, len, 785 MSG_DONTWAIT | MSG_NOSIGNAL | 786 MSG_MORE); 787 788 if (crc && (msg->pages || msg->pagelist)) 789 kunmap(page); 790 791 if (ret <= 0) 792 goto out; 793 794 con->out_msg_pos.data_pos += ret; 795 con->out_msg_pos.page_pos += ret; 796 if (ret == len) { 797 con->out_msg_pos.page_pos = 0; 798 con->out_msg_pos.page++; 799 con->out_msg_pos.did_page_crc = 0; 800 if (msg->pagelist) 801 list_move_tail(&page->lru, 802 &msg->pagelist->head); 803 } 804 } 805 806 dout("write_partial_msg_pages %p msg %p done\n", con, msg); 807 808 /* prepare and queue up footer, too */ 809 if (!crc) 810 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 811 con->out_kvec_bytes = 0; 812 con->out_kvec_left = 0; 813 con->out_kvec_cur = con->out_kvec; 814 prepare_write_message_footer(con, 0); 815 ret = 1; 816out: 817 return ret; 818} 819 820/* 821 * write some zeros 822 */ 823static int write_partial_skip(struct ceph_connection *con) 824{ 825 int ret; 826 827 while (con->out_skip > 0) { 828 struct kvec iov = { 829 .iov_base = page_address(con->msgr->zero_page), 830 .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE) 831 }; 832 833 ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1); 834 if (ret <= 0) 835 goto out; 836 con->out_skip -= ret; 837 } 838 ret = 1; 839out: 840 return ret; 841} 842 843/* 844 * Prepare to read connection handshake, or an ack. 845 */ 846static void prepare_read_banner(struct ceph_connection *con) 847{ 848 dout("prepare_read_banner %p\n", con); 849 con->in_base_pos = 0; 850} 851 852static void prepare_read_connect(struct ceph_connection *con) 853{ 854 dout("prepare_read_connect %p\n", con); 855 con->in_base_pos = 0; 856} 857 858static void prepare_read_ack(struct ceph_connection *con) 859{ 860 dout("prepare_read_ack %p\n", con); 861 con->in_base_pos = 0; 862} 863 864static void prepare_read_tag(struct ceph_connection *con) 865{ 866 dout("prepare_read_tag %p\n", con); 867 con->in_base_pos = 0; 868 con->in_tag = CEPH_MSGR_TAG_READY; 869} 870 871/* 872 * Prepare to read a message. 873 */ 874static int prepare_read_message(struct ceph_connection *con) 875{ 876 dout("prepare_read_message %p\n", con); 877 BUG_ON(con->in_msg != NULL); 878 con->in_base_pos = 0; 879 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; 880 return 0; 881} 882 883 884static int read_partial(struct ceph_connection *con, 885 int *to, int size, void *object) 886{ 887 *to += size; 888 while (con->in_base_pos < *to) { 889 int left = *to - con->in_base_pos; 890 int have = size - left; 891 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); 892 if (ret <= 0) 893 return ret; 894 con->in_base_pos += ret; 895 } 896 return 1; 897} 898 899 900/* 901 * Read all or part of the connect-side handshake on a new connection 902 */ 903static int read_partial_banner(struct ceph_connection *con) 904{ 905 int ret, to = 0; 906 907 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); 908 909 /* peer's banner */ 910 ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner); 911 if (ret <= 0) 912 goto out; 913 ret = read_partial(con, &to, sizeof(con->actual_peer_addr), 914 &con->actual_peer_addr); 915 if (ret <= 0) 916 goto out; 917 ret = read_partial(con, &to, sizeof(con->peer_addr_for_me), 918 &con->peer_addr_for_me); 919 if (ret <= 0) 920 goto out; 921out: 922 return ret; 923} 924 925static int read_partial_connect(struct ceph_connection *con) 926{ 927 int ret, to = 0; 928 929 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); 930 931 ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply); 932 if (ret <= 0) 933 goto out; 934 ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len), 935 con->auth_reply_buf); 936 if (ret <= 0) 937 goto out; 938 939 dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", 940 con, (int)con->in_reply.tag, 941 le32_to_cpu(con->in_reply.connect_seq), 942 le32_to_cpu(con->in_reply.global_seq)); 943out: 944 return ret; 945 946} 947 948/* 949 * Verify the hello banner looks okay. 950 */ 951static int verify_hello(struct ceph_connection *con) 952{ 953 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { 954 pr_err("connect to %s got bad banner\n", 955 pr_addr(&con->peer_addr.in_addr)); 956 con->error_msg = "protocol error, bad banner"; 957 return -1; 958 } 959 return 0; 960} 961 962static bool addr_is_blank(struct sockaddr_storage *ss) 963{ 964 switch (ss->ss_family) { 965 case AF_INET: 966 return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0; 967 case AF_INET6: 968 return 969 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 && 970 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 && 971 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 && 972 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0; 973 } 974 return false; 975} 976 977static int addr_port(struct sockaddr_storage *ss) 978{ 979 switch (ss->ss_family) { 980 case AF_INET: 981 return ntohs(((struct sockaddr_in *)ss)->sin_port); 982 case AF_INET6: 983 return ntohs(((struct sockaddr_in6 *)ss)->sin6_port); 984 } 985 return 0; 986} 987 988static void addr_set_port(struct sockaddr_storage *ss, int p) 989{ 990 switch (ss->ss_family) { 991 case AF_INET: 992 ((struct sockaddr_in *)ss)->sin_port = htons(p); 993 case AF_INET6: 994 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p); 995 } 996} 997 998/* 999 * Parse an ip[:port] list into an addr array. Use the default 1000 * monitor port if a port isn't specified. 1001 */ 1002int ceph_parse_ips(const char *c, const char *end, 1003 struct ceph_entity_addr *addr, 1004 int max_count, int *count) 1005{ 1006 int i; 1007 const char *p = c; 1008 1009 dout("parse_ips on '%.*s'\n", (int)(end-c), c); 1010 for (i = 0; i < max_count; i++) { 1011 const char *ipend; 1012 struct sockaddr_storage *ss = &addr[i].in_addr; 1013 struct sockaddr_in *in4 = (void *)ss; 1014 struct sockaddr_in6 *in6 = (void *)ss; 1015 int port; 1016 1017 memset(ss, 0, sizeof(*ss)); 1018 if (in4_pton(p, end - p, (u8 *)&in4->sin_addr.s_addr, 1019 ',', &ipend)) { 1020 ss->ss_family = AF_INET; 1021 } else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr, 1022 ',', &ipend)) { 1023 ss->ss_family = AF_INET6; 1024 } else { 1025 goto bad; 1026 } 1027 p = ipend; 1028 1029 /* port? */ 1030 if (p < end && *p == ':') { 1031 port = 0; 1032 p++; 1033 while (p < end && *p >= '0' && *p <= '9') { 1034 port = (port * 10) + (*p - '0'); 1035 p++; 1036 } 1037 if (port > 65535 || port == 0) 1038 goto bad; 1039 } else { 1040 port = CEPH_MON_PORT; 1041 } 1042 1043 addr_set_port(ss, port); 1044 1045 dout("parse_ips got %s\n", pr_addr(ss)); 1046 1047 if (p == end) 1048 break; 1049 if (*p != ',') 1050 goto bad; 1051 p++; 1052 } 1053 1054 if (p != end) 1055 goto bad; 1056 1057 if (count) 1058 *count = i + 1; 1059 return 0; 1060 1061bad: 1062 pr_err("parse_ips bad ip '%s'\n", c); 1063 return -EINVAL; 1064} 1065 1066static int process_banner(struct ceph_connection *con) 1067{ 1068 dout("process_banner on %p\n", con); 1069 1070 if (verify_hello(con) < 0) 1071 return -1; 1072 1073 ceph_decode_addr(&con->actual_peer_addr); 1074 ceph_decode_addr(&con->peer_addr_for_me); 1075 1076 /* 1077 * Make sure the other end is who we wanted. note that the other 1078 * end may not yet know their ip address, so if it's 0.0.0.0, give 1079 * them the benefit of the doubt. 1080 */ 1081 if (memcmp(&con->peer_addr, &con->actual_peer_addr, 1082 sizeof(con->peer_addr)) != 0 && 1083 !(addr_is_blank(&con->actual_peer_addr.in_addr) && 1084 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { 1085 pr_warning("wrong peer, want %s/%lld, got %s/%lld\n", 1086 pr_addr(&con->peer_addr.in_addr), 1087 le64_to_cpu(con->peer_addr.nonce), 1088 pr_addr(&con->actual_peer_addr.in_addr), 1089 le64_to_cpu(con->actual_peer_addr.nonce)); 1090 con->error_msg = "wrong peer at address"; 1091 return -1; 1092 } 1093 1094 /* 1095 * did we learn our address? 1096 */ 1097 if (addr_is_blank(&con->msgr->inst.addr.in_addr)) { 1098 int port = addr_port(&con->msgr->inst.addr.in_addr); 1099 1100 memcpy(&con->msgr->inst.addr.in_addr, 1101 &con->peer_addr_for_me.in_addr, 1102 sizeof(con->peer_addr_for_me.in_addr)); 1103 addr_set_port(&con->msgr->inst.addr.in_addr, port); 1104 encode_my_addr(con->msgr); 1105 dout("process_banner learned my addr is %s\n", 1106 pr_addr(&con->msgr->inst.addr.in_addr)); 1107 } 1108 1109 set_bit(NEGOTIATING, &con->state); 1110 prepare_read_connect(con); 1111 return 0; 1112} 1113 1114static void fail_protocol(struct ceph_connection *con) 1115{ 1116 reset_connection(con); 1117 set_bit(CLOSED, &con->state); /* in case there's queued work */ 1118 1119 mutex_unlock(&con->mutex); 1120 if (con->ops->bad_proto) 1121 con->ops->bad_proto(con); 1122 mutex_lock(&con->mutex); 1123} 1124 1125static int process_connect(struct ceph_connection *con) 1126{ 1127 u64 sup_feat = CEPH_FEATURE_SUPPORTED; 1128 u64 req_feat = CEPH_FEATURE_REQUIRED; 1129 u64 server_feat = le64_to_cpu(con->in_reply.features); 1130 1131 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 1132 1133 switch (con->in_reply.tag) { 1134 case CEPH_MSGR_TAG_FEATURES: 1135 pr_err("%s%lld %s feature set mismatch," 1136 " my %llx < server's %llx, missing %llx\n", 1137 ENTITY_NAME(con->peer_name), 1138 pr_addr(&con->peer_addr.in_addr), 1139 sup_feat, server_feat, server_feat & ~sup_feat); 1140 con->error_msg = "missing required protocol features"; 1141 fail_protocol(con); 1142 return -1; 1143 1144 case CEPH_MSGR_TAG_BADPROTOVER: 1145 pr_err("%s%lld %s protocol version mismatch," 1146 " my %d != server's %d\n", 1147 ENTITY_NAME(con->peer_name), 1148 pr_addr(&con->peer_addr.in_addr), 1149 le32_to_cpu(con->out_connect.protocol_version), 1150 le32_to_cpu(con->in_reply.protocol_version)); 1151 con->error_msg = "protocol version mismatch"; 1152 fail_protocol(con); 1153 return -1; 1154 1155 case CEPH_MSGR_TAG_BADAUTHORIZER: 1156 con->auth_retry++; 1157 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, 1158 con->auth_retry); 1159 if (con->auth_retry == 2) { 1160 con->error_msg = "connect authorization failure"; 1161 reset_connection(con); 1162 set_bit(CLOSED, &con->state); 1163 return -1; 1164 } 1165 con->auth_retry = 1; 1166 prepare_write_connect(con->msgr, con, 0); 1167 prepare_read_connect(con); 1168 break; 1169 1170 case CEPH_MSGR_TAG_RESETSESSION: 1171 /* 1172 * If we connected with a large connect_seq but the peer 1173 * has no record of a session with us (no connection, or 1174 * connect_seq == 0), they will send RESETSESION to indicate 1175 * that they must have reset their session, and may have 1176 * dropped messages. 1177 */ 1178 dout("process_connect got RESET peer seq %u\n", 1179 le32_to_cpu(con->in_connect.connect_seq)); 1180 pr_err("%s%lld %s connection reset\n", 1181 ENTITY_NAME(con->peer_name), 1182 pr_addr(&con->peer_addr.in_addr)); 1183 reset_connection(con); 1184 prepare_write_connect(con->msgr, con, 0); 1185 prepare_read_connect(con); 1186 1187 /* Tell ceph about it. */ 1188 mutex_unlock(&con->mutex); 1189 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); 1190 if (con->ops->peer_reset) 1191 con->ops->peer_reset(con); 1192 mutex_lock(&con->mutex); 1193 break; 1194 1195 case CEPH_MSGR_TAG_RETRY_SESSION: 1196 /* 1197 * If we sent a smaller connect_seq than the peer has, try 1198 * again with a larger value. 1199 */ 1200 dout("process_connect got RETRY my seq = %u, peer_seq = %u\n", 1201 le32_to_cpu(con->out_connect.connect_seq), 1202 le32_to_cpu(con->in_connect.connect_seq)); 1203 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); 1204 prepare_write_connect(con->msgr, con, 0); 1205 prepare_read_connect(con); 1206 break; 1207 1208 case CEPH_MSGR_TAG_RETRY_GLOBAL: 1209 /* 1210 * If we sent a smaller global_seq than the peer has, try 1211 * again with a larger value. 1212 */ 1213 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", 1214 con->peer_global_seq, 1215 le32_to_cpu(con->in_connect.global_seq)); 1216 get_global_seq(con->msgr, 1217 le32_to_cpu(con->in_connect.global_seq)); 1218 prepare_write_connect(con->msgr, con, 0); 1219 prepare_read_connect(con); 1220 break; 1221 1222 case CEPH_MSGR_TAG_READY: 1223 if (req_feat & ~server_feat) { 1224 pr_err("%s%lld %s protocol feature mismatch," 1225 " my required %llx > server's %llx, need %llx\n", 1226 ENTITY_NAME(con->peer_name), 1227 pr_addr(&con->peer_addr.in_addr), 1228 req_feat, server_feat, req_feat & ~server_feat); 1229 con->error_msg = "missing required protocol features"; 1230 fail_protocol(con); 1231 return -1; 1232 } 1233 clear_bit(CONNECTING, &con->state); 1234 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1235 con->connect_seq++; 1236 dout("process_connect got READY gseq %d cseq %d (%d)\n", 1237 con->peer_global_seq, 1238 le32_to_cpu(con->in_reply.connect_seq), 1239 con->connect_seq); 1240 WARN_ON(con->connect_seq != 1241 le32_to_cpu(con->in_reply.connect_seq)); 1242 1243 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 1244 set_bit(LOSSYTX, &con->state); 1245 1246 prepare_read_tag(con); 1247 break; 1248 1249 case CEPH_MSGR_TAG_WAIT: 1250 /* 1251 * If there is a connection race (we are opening 1252 * connections to each other), one of us may just have 1253 * to WAIT. This shouldn't happen if we are the 1254 * client. 1255 */ 1256 pr_err("process_connect peer connecting WAIT\n"); 1257 1258 default: 1259 pr_err("connect protocol error, will retry\n"); 1260 con->error_msg = "protocol error, garbage tag during connect"; 1261 return -1; 1262 } 1263 return 0; 1264} 1265 1266 1267/* 1268 * read (part of) an ack 1269 */ 1270static int read_partial_ack(struct ceph_connection *con) 1271{ 1272 int to = 0; 1273 1274 return read_partial(con, &to, sizeof(con->in_temp_ack), 1275 &con->in_temp_ack); 1276} 1277 1278 1279/* 1280 * We can finally discard anything that's been acked. 1281 */ 1282static void process_ack(struct ceph_connection *con) 1283{ 1284 struct ceph_msg *m; 1285 u64 ack = le64_to_cpu(con->in_temp_ack); 1286 u64 seq; 1287 1288 while (!list_empty(&con->out_sent)) { 1289 m = list_first_entry(&con->out_sent, struct ceph_msg, 1290 list_head); 1291 seq = le64_to_cpu(m->hdr.seq); 1292 if (seq > ack) 1293 break; 1294 dout("got ack for seq %llu type %d at %p\n", seq, 1295 le16_to_cpu(m->hdr.type), m); 1296 ceph_msg_remove(m); 1297 } 1298 prepare_read_tag(con); 1299} 1300 1301 1302 1303 1304static int read_partial_message_section(struct ceph_connection *con, 1305 struct kvec *section, unsigned int sec_len, 1306 u32 *crc) 1307{ 1308 int left; 1309 int ret; 1310 1311 BUG_ON(!section); 1312 1313 while (section->iov_len < sec_len) { 1314 BUG_ON(section->iov_base == NULL); 1315 left = sec_len - section->iov_len; 1316 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + 1317 section->iov_len, left); 1318 if (ret <= 0) 1319 return ret; 1320 section->iov_len += ret; 1321 if (section->iov_len == sec_len) 1322 *crc = crc32c(0, section->iov_base, 1323 section->iov_len); 1324 } 1325 1326 return 1; 1327} 1328 1329static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 1330 struct ceph_msg_header *hdr, 1331 int *skip); 1332/* 1333 * read (part of) a message. 1334 */ 1335static int read_partial_message(struct ceph_connection *con) 1336{ 1337 struct ceph_msg *m = con->in_msg; 1338 void *p; 1339 int ret; 1340 int to, left; 1341 unsigned front_len, middle_len, data_len, data_off; 1342 int datacrc = con->msgr->nocrc; 1343 int skip; 1344 u64 seq; 1345 1346 dout("read_partial_message con %p msg %p\n", con, m); 1347 1348 /* header */ 1349 while (con->in_base_pos < sizeof(con->in_hdr)) { 1350 left = sizeof(con->in_hdr) - con->in_base_pos; 1351 ret = ceph_tcp_recvmsg(con->sock, 1352 (char *)&con->in_hdr + con->in_base_pos, 1353 left); 1354 if (ret <= 0) 1355 return ret; 1356 con->in_base_pos += ret; 1357 if (con->in_base_pos == sizeof(con->in_hdr)) { 1358 u32 crc = crc32c(0, (void *)&con->in_hdr, 1359 sizeof(con->in_hdr) - sizeof(con->in_hdr.crc)); 1360 if (crc != le32_to_cpu(con->in_hdr.crc)) { 1361 pr_err("read_partial_message bad hdr " 1362 " crc %u != expected %u\n", 1363 crc, con->in_hdr.crc); 1364 return -EBADMSG; 1365 } 1366 } 1367 } 1368 front_len = le32_to_cpu(con->in_hdr.front_len); 1369 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 1370 return -EIO; 1371 middle_len = le32_to_cpu(con->in_hdr.middle_len); 1372 if (middle_len > CEPH_MSG_MAX_DATA_LEN) 1373 return -EIO; 1374 data_len = le32_to_cpu(con->in_hdr.data_len); 1375 if (data_len > CEPH_MSG_MAX_DATA_LEN) 1376 return -EIO; 1377 data_off = le16_to_cpu(con->in_hdr.data_off); 1378 1379 /* verify seq# */ 1380 seq = le64_to_cpu(con->in_hdr.seq); 1381 if ((s64)seq - (s64)con->in_seq < 1) { 1382 pr_info("skipping %s%lld %s seq %lld, expected %lld\n", 1383 ENTITY_NAME(con->peer_name), 1384 pr_addr(&con->peer_addr.in_addr), 1385 seq, con->in_seq + 1); 1386 con->in_base_pos = -front_len - middle_len - data_len - 1387 sizeof(m->footer); 1388 con->in_tag = CEPH_MSGR_TAG_READY; 1389 con->in_seq++; 1390 return 0; 1391 } else if ((s64)seq - (s64)con->in_seq > 1) { 1392 pr_err("read_partial_message bad seq %lld expected %lld\n", 1393 seq, con->in_seq + 1); 1394 con->error_msg = "bad message sequence # for incoming message"; 1395 return -EBADMSG; 1396 } 1397 1398 /* allocate message? */ 1399 if (!con->in_msg) { 1400 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 1401 con->in_hdr.front_len, con->in_hdr.data_len); 1402 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); 1403 if (skip) { 1404 /* skip this message */ 1405 dout("alloc_msg returned NULL, skipping message\n"); 1406 con->in_base_pos = -front_len - middle_len - data_len - 1407 sizeof(m->footer); 1408 con->in_tag = CEPH_MSGR_TAG_READY; 1409 con->in_seq++; 1410 return 0; 1411 } 1412 if (IS_ERR(con->in_msg)) { 1413 ret = PTR_ERR(con->in_msg); 1414 con->in_msg = NULL; 1415 con->error_msg = 1416 "error allocating memory for incoming message"; 1417 return ret; 1418 } 1419 m = con->in_msg; 1420 m->front.iov_len = 0; /* haven't read it yet */ 1421 if (m->middle) 1422 m->middle->vec.iov_len = 0; 1423 1424 con->in_msg_pos.page = 0; 1425 con->in_msg_pos.page_pos = data_off & ~PAGE_MASK; 1426 con->in_msg_pos.data_pos = 0; 1427 } 1428 1429 /* front */ 1430 ret = read_partial_message_section(con, &m->front, front_len, 1431 &con->in_front_crc); 1432 if (ret <= 0) 1433 return ret; 1434 1435 /* middle */ 1436 if (m->middle) { 1437 ret = read_partial_message_section(con, &m->middle->vec, middle_len, 1438 &con->in_middle_crc); 1439 if (ret <= 0) 1440 return ret; 1441 } 1442 1443 /* (page) data */ 1444 while (con->in_msg_pos.data_pos < data_len) { 1445 left = min((int)(data_len - con->in_msg_pos.data_pos), 1446 (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); 1447 BUG_ON(m->pages == NULL); 1448 p = kmap(m->pages[con->in_msg_pos.page]); 1449 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1450 left); 1451 if (ret > 0 && datacrc) 1452 con->in_data_crc = 1453 crc32c(con->in_data_crc, 1454 p + con->in_msg_pos.page_pos, ret); 1455 kunmap(m->pages[con->in_msg_pos.page]); 1456 if (ret <= 0) 1457 return ret; 1458 con->in_msg_pos.data_pos += ret; 1459 con->in_msg_pos.page_pos += ret; 1460 if (con->in_msg_pos.page_pos == PAGE_SIZE) { 1461 con->in_msg_pos.page_pos = 0; 1462 con->in_msg_pos.page++; 1463 } 1464 } 1465 1466 /* footer */ 1467 to = sizeof(m->hdr) + sizeof(m->footer); 1468 while (con->in_base_pos < to) { 1469 left = to - con->in_base_pos; 1470 ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer + 1471 (con->in_base_pos - sizeof(m->hdr)), 1472 left); 1473 if (ret <= 0) 1474 return ret; 1475 con->in_base_pos += ret; 1476 } 1477 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 1478 m, front_len, m->footer.front_crc, middle_len, 1479 m->footer.middle_crc, data_len, m->footer.data_crc); 1480 1481 /* crc ok? */ 1482 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { 1483 pr_err("read_partial_message %p front crc %u != exp. %u\n", 1484 m, con->in_front_crc, m->footer.front_crc); 1485 return -EBADMSG; 1486 } 1487 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { 1488 pr_err("read_partial_message %p middle crc %u != exp %u\n", 1489 m, con->in_middle_crc, m->footer.middle_crc); 1490 return -EBADMSG; 1491 } 1492 if (datacrc && 1493 (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && 1494 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { 1495 pr_err("read_partial_message %p data crc %u != exp. %u\n", m, 1496 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); 1497 return -EBADMSG; 1498 } 1499 1500 return 1; /* done! */ 1501} 1502 1503/* 1504 * Process message. This happens in the worker thread. The callback should 1505 * be careful not to do anything that waits on other incoming messages or it 1506 * may deadlock. 1507 */ 1508static void process_message(struct ceph_connection *con) 1509{ 1510 struct ceph_msg *msg; 1511 1512 msg = con->in_msg; 1513 con->in_msg = NULL; 1514 1515 /* if first message, set peer_name */ 1516 if (con->peer_name.type == 0) 1517 con->peer_name = msg->hdr.src.name; 1518 1519 con->in_seq++; 1520 mutex_unlock(&con->mutex); 1521 1522 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", 1523 msg, le64_to_cpu(msg->hdr.seq), 1524 ENTITY_NAME(msg->hdr.src.name), 1525 le16_to_cpu(msg->hdr.type), 1526 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 1527 le32_to_cpu(msg->hdr.front_len), 1528 le32_to_cpu(msg->hdr.data_len), 1529 con->in_front_crc, con->in_middle_crc, con->in_data_crc); 1530 con->ops->dispatch(con, msg); 1531 1532 mutex_lock(&con->mutex); 1533 prepare_read_tag(con); 1534} 1535 1536 1537/* 1538 * Write something to the socket. Called in a worker thread when the 1539 * socket appears to be writeable and we have something ready to send. 1540 */ 1541static int try_write(struct ceph_connection *con) 1542{ 1543 struct ceph_messenger *msgr = con->msgr; 1544 int ret = 1; 1545 1546 dout("try_write start %p state %lu nref %d\n", con, con->state, 1547 atomic_read(&con->nref)); 1548 1549 mutex_lock(&con->mutex); 1550more: 1551 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 1552 1553 /* open the socket first? */ 1554 if (con->sock == NULL) { 1555 /* 1556 * if we were STANDBY and are reconnecting _this_ 1557 * connection, bump connect_seq now. Always bump 1558 * global_seq. 1559 */ 1560 if (test_and_clear_bit(STANDBY, &con->state)) 1561 con->connect_seq++; 1562 1563 prepare_write_banner(msgr, con); 1564 prepare_write_connect(msgr, con, 1); 1565 prepare_read_banner(con); 1566 set_bit(CONNECTING, &con->state); 1567 clear_bit(NEGOTIATING, &con->state); 1568 1569 BUG_ON(con->in_msg); 1570 con->in_tag = CEPH_MSGR_TAG_READY; 1571 dout("try_write initiating connect on %p new state %lu\n", 1572 con, con->state); 1573 con->sock = ceph_tcp_connect(con); 1574 if (IS_ERR(con->sock)) { 1575 con->sock = NULL; 1576 con->error_msg = "connect error"; 1577 ret = -1; 1578 goto out; 1579 } 1580 } 1581 1582more_kvec: 1583 /* kvec data queued? */ 1584 if (con->out_skip) { 1585 ret = write_partial_skip(con); 1586 if (ret <= 0) 1587 goto done; 1588 if (ret < 0) { 1589 dout("try_write write_partial_skip err %d\n", ret); 1590 goto done; 1591 } 1592 } 1593 if (con->out_kvec_left) { 1594 ret = write_partial_kvec(con); 1595 if (ret <= 0) 1596 goto done; 1597 } 1598 1599 /* msg pages? */ 1600 if (con->out_msg) { 1601 if (con->out_msg_done) { 1602 ceph_msg_put(con->out_msg); 1603 con->out_msg = NULL; /* we're done with this one */ 1604 goto do_next; 1605 } 1606 1607 ret = write_partial_msg_pages(con); 1608 if (ret == 1) 1609 goto more_kvec; /* we need to send the footer, too! */ 1610 if (ret == 0) 1611 goto done; 1612 if (ret < 0) { 1613 dout("try_write write_partial_msg_pages err %d\n", 1614 ret); 1615 goto done; 1616 } 1617 } 1618 1619do_next: 1620 if (!test_bit(CONNECTING, &con->state)) { 1621 /* is anything else pending? */ 1622 if (!list_empty(&con->out_queue)) { 1623 prepare_write_message(con); 1624 goto more; 1625 } 1626 if (con->in_seq > con->in_seq_acked) { 1627 prepare_write_ack(con); 1628 goto more; 1629 } 1630 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { 1631 prepare_write_keepalive(con); 1632 goto more; 1633 } 1634 } 1635 1636 /* Nothing to do! */ 1637 clear_bit(WRITE_PENDING, &con->state); 1638 dout("try_write nothing else to write.\n"); 1639done: 1640 ret = 0; 1641out: 1642 mutex_unlock(&con->mutex); 1643 dout("try_write done on %p\n", con); 1644 return ret; 1645} 1646 1647 1648 1649/* 1650 * Read what we can from the socket. 1651 */ 1652static int try_read(struct ceph_connection *con) 1653{ 1654 struct ceph_messenger *msgr; 1655 int ret = -1; 1656 1657 if (!con->sock) 1658 return 0; 1659 1660 if (test_bit(STANDBY, &con->state)) 1661 return 0; 1662 1663 dout("try_read start on %p\n", con); 1664 msgr = con->msgr; 1665 1666 mutex_lock(&con->mutex); 1667 1668more: 1669 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 1670 con->in_base_pos); 1671 if (test_bit(CONNECTING, &con->state)) { 1672 if (!test_bit(NEGOTIATING, &con->state)) { 1673 dout("try_read connecting\n"); 1674 ret = read_partial_banner(con); 1675 if (ret <= 0) 1676 goto done; 1677 if (process_banner(con) < 0) { 1678 ret = -1; 1679 goto out; 1680 } 1681 } 1682 ret = read_partial_connect(con); 1683 if (ret <= 0) 1684 goto done; 1685 if (process_connect(con) < 0) { 1686 ret = -1; 1687 goto out; 1688 } 1689 goto more; 1690 } 1691 1692 if (con->in_base_pos < 0) { 1693 /* 1694 * skipping + discarding content. 1695 * 1696 * FIXME: there must be a better way to do this! 1697 */ 1698 static char buf[1024]; 1699 int skip = min(1024, -con->in_base_pos); 1700 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); 1701 ret = ceph_tcp_recvmsg(con->sock, buf, skip); 1702 if (ret <= 0) 1703 goto done; 1704 con->in_base_pos += ret; 1705 if (con->in_base_pos) 1706 goto more; 1707 } 1708 if (con->in_tag == CEPH_MSGR_TAG_READY) { 1709 /* 1710 * what's next? 1711 */ 1712 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); 1713 if (ret <= 0) 1714 goto done; 1715 dout("try_read got tag %d\n", (int)con->in_tag); 1716 switch (con->in_tag) { 1717 case CEPH_MSGR_TAG_MSG: 1718 prepare_read_message(con); 1719 break; 1720 case CEPH_MSGR_TAG_ACK: 1721 prepare_read_ack(con); 1722 break; 1723 case CEPH_MSGR_TAG_CLOSE: 1724 set_bit(CLOSED, &con->state); /* fixme */ 1725 goto done; 1726 default: 1727 goto bad_tag; 1728 } 1729 } 1730 if (con->in_tag == CEPH_MSGR_TAG_MSG) { 1731 ret = read_partial_message(con); 1732 if (ret <= 0) { 1733 switch (ret) { 1734 case -EBADMSG: 1735 con->error_msg = "bad crc"; 1736 ret = -EIO; 1737 goto out; 1738 case -EIO: 1739 con->error_msg = "io error"; 1740 goto out; 1741 default: 1742 goto done; 1743 } 1744 } 1745 if (con->in_tag == CEPH_MSGR_TAG_READY) 1746 goto more; 1747 process_message(con); 1748 goto more; 1749 } 1750 if (con->in_tag == CEPH_MSGR_TAG_ACK) { 1751 ret = read_partial_ack(con); 1752 if (ret <= 0) 1753 goto done; 1754 process_ack(con); 1755 goto more; 1756 } 1757 1758done: 1759 ret = 0; 1760out: 1761 mutex_unlock(&con->mutex); 1762 dout("try_read done on %p\n", con); 1763 return ret; 1764 1765bad_tag: 1766 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); 1767 con->error_msg = "protocol error, garbage tag"; 1768 ret = -1; 1769 goto out; 1770} 1771 1772 1773/* 1774 * Atomically queue work on a connection. Bump @con reference to 1775 * avoid races with connection teardown. 1776 * 1777 * There is some trickery going on with QUEUED and BUSY because we 1778 * only want a _single_ thread operating on each connection at any 1779 * point in time, but we want to use all available CPUs. 1780 * 1781 * The worker thread only proceeds if it can atomically set BUSY. It 1782 * clears QUEUED and does it's thing. When it thinks it's done, it 1783 * clears BUSY, then rechecks QUEUED.. if it's set again, it loops 1784 * (tries again to set BUSY). 1785 * 1786 * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we 1787 * try to queue work. If that fails (work is already queued, or BUSY) 1788 * we give up (work also already being done or is queued) but leave QUEUED 1789 * set so that the worker thread will loop if necessary. 1790 */ 1791static void queue_con(struct ceph_connection *con) 1792{ 1793 if (test_bit(DEAD, &con->state)) { 1794 dout("queue_con %p ignoring: DEAD\n", 1795 con); 1796 return; 1797 } 1798 1799 if (!con->ops->get(con)) { 1800 dout("queue_con %p ref count 0\n", con); 1801 return; 1802 } 1803 1804 set_bit(QUEUED, &con->state); 1805 if (test_bit(BUSY, &con->state)) { 1806 dout("queue_con %p - already BUSY\n", con); 1807 con->ops->put(con); 1808 } else if (!queue_work(ceph_msgr_wq, &con->work.work)) { 1809 dout("queue_con %p - already queued\n", con); 1810 con->ops->put(con); 1811 } else { 1812 dout("queue_con %p\n", con); 1813 } 1814} 1815 1816/* 1817 * Do some work on a connection. Drop a connection ref when we're done. 1818 */ 1819static void con_work(struct work_struct *work) 1820{ 1821 struct ceph_connection *con = container_of(work, struct ceph_connection, 1822 work.work); 1823 int backoff = 0; 1824 1825more: 1826 if (test_and_set_bit(BUSY, &con->state) != 0) { 1827 dout("con_work %p BUSY already set\n", con); 1828 goto out; 1829 } 1830 dout("con_work %p start, clearing QUEUED\n", con); 1831 clear_bit(QUEUED, &con->state); 1832 1833 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ 1834 dout("con_work CLOSED\n"); 1835 con_close_socket(con); 1836 goto done; 1837 } 1838 if (test_and_clear_bit(OPENING, &con->state)) { 1839 /* reopen w/ new peer */ 1840 dout("con_work OPENING\n"); 1841 con_close_socket(con); 1842 } 1843 1844 if (test_and_clear_bit(SOCK_CLOSED, &con->state) || 1845 try_read(con) < 0 || 1846 try_write(con) < 0) { 1847 backoff = 1; 1848 ceph_fault(con); /* error/fault path */ 1849 } 1850 1851done: 1852 clear_bit(BUSY, &con->state); 1853 dout("con->state=%lu\n", con->state); 1854 if (test_bit(QUEUED, &con->state)) { 1855 if (!backoff || test_bit(OPENING, &con->state)) { 1856 dout("con_work %p QUEUED reset, looping\n", con); 1857 goto more; 1858 } 1859 dout("con_work %p QUEUED reset, but just faulted\n", con); 1860 clear_bit(QUEUED, &con->state); 1861 } 1862 dout("con_work %p done\n", con); 1863 1864out: 1865 con->ops->put(con); 1866} 1867 1868 1869/* 1870 * Generic error/fault handler. A retry mechanism is used with 1871 * exponential backoff 1872 */ 1873static void ceph_fault(struct ceph_connection *con) 1874{ 1875 pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 1876 pr_addr(&con->peer_addr.in_addr), con->error_msg); 1877 dout("fault %p state %lu to peer %s\n", 1878 con, con->state, pr_addr(&con->peer_addr.in_addr)); 1879 1880 if (test_bit(LOSSYTX, &con->state)) { 1881 dout("fault on LOSSYTX channel\n"); 1882 goto out; 1883 } 1884 1885 mutex_lock(&con->mutex); 1886 if (test_bit(CLOSED, &con->state)) 1887 goto out_unlock; 1888 1889 con_close_socket(con); 1890 1891 if (con->in_msg) { 1892 ceph_msg_put(con->in_msg); 1893 con->in_msg = NULL; 1894 } 1895 1896 /* Requeue anything that hasn't been acked */ 1897 list_splice_init(&con->out_sent, &con->out_queue); 1898 1899 /* If there are no messages in the queue, place the connection 1900 * in a STANDBY state (i.e., don't try to reconnect just yet). */ 1901 if (list_empty(&con->out_queue) && !con->out_keepalive_pending) { 1902 dout("fault setting STANDBY\n"); 1903 set_bit(STANDBY, &con->state); 1904 } else { 1905 /* retry after a delay. */ 1906 if (con->delay == 0) 1907 con->delay = BASE_DELAY_INTERVAL; 1908 else if (con->delay < MAX_DELAY_INTERVAL) 1909 con->delay *= 2; 1910 dout("fault queueing %p delay %lu\n", con, con->delay); 1911 con->ops->get(con); 1912 if (queue_delayed_work(ceph_msgr_wq, &con->work, 1913 round_jiffies_relative(con->delay)) == 0) 1914 con->ops->put(con); 1915 } 1916 1917out_unlock: 1918 mutex_unlock(&con->mutex); 1919out: 1920 /* 1921 * in case we faulted due to authentication, invalidate our 1922 * current tickets so that we can get new ones. 1923 */ 1924 if (con->auth_retry && con->ops->invalidate_authorizer) { 1925 dout("calling invalidate_authorizer()\n"); 1926 con->ops->invalidate_authorizer(con); 1927 } 1928 1929 if (con->ops->fault) 1930 con->ops->fault(con); 1931} 1932 1933 1934 1935/* 1936 * create a new messenger instance 1937 */ 1938struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) 1939{ 1940 struct ceph_messenger *msgr; 1941 1942 msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); 1943 if (msgr == NULL) 1944 return ERR_PTR(-ENOMEM); 1945 1946 spin_lock_init(&msgr->global_seq_lock); 1947 1948 /* the zero page is needed if a request is "canceled" while the message 1949 * is being written over the socket */ 1950 msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO); 1951 if (!msgr->zero_page) { 1952 kfree(msgr); 1953 return ERR_PTR(-ENOMEM); 1954 } 1955 kmap(msgr->zero_page); 1956 1957 if (myaddr) 1958 msgr->inst.addr = *myaddr; 1959 1960 /* select a random nonce */ 1961 msgr->inst.addr.type = 0; 1962 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); 1963 encode_my_addr(msgr); 1964 1965 dout("messenger_create %p\n", msgr); 1966 return msgr; 1967} 1968 1969void ceph_messenger_destroy(struct ceph_messenger *msgr) 1970{ 1971 dout("destroy %p\n", msgr); 1972 kunmap(msgr->zero_page); 1973 __free_page(msgr->zero_page); 1974 kfree(msgr); 1975 dout("destroyed messenger %p\n", msgr); 1976} 1977 1978/* 1979 * Queue up an outgoing message on the given connection. 1980 */ 1981void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) 1982{ 1983 if (test_bit(CLOSED, &con->state)) { 1984 dout("con_send %p closed, dropping %p\n", con, msg); 1985 ceph_msg_put(msg); 1986 return; 1987 } 1988 1989 /* set src+dst */ 1990 msg->hdr.src.name = con->msgr->inst.name; 1991 msg->hdr.src.addr = con->msgr->my_enc_addr; 1992 msg->hdr.orig_src = msg->hdr.src; 1993 1994 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 1995 1996 msg->needs_out_seq = true; 1997 1998 /* queue */ 1999 mutex_lock(&con->mutex); 2000 BUG_ON(!list_empty(&msg->list_head)); 2001 list_add_tail(&msg->list_head, &con->out_queue); 2002 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, 2003 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), 2004 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 2005 le32_to_cpu(msg->hdr.front_len), 2006 le32_to_cpu(msg->hdr.middle_len), 2007 le32_to_cpu(msg->hdr.data_len)); 2008 mutex_unlock(&con->mutex); 2009 2010 /* if there wasn't anything waiting to send before, queue 2011 * new work */ 2012 if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2013 queue_con(con); 2014} 2015 2016/* 2017 * Revoke a message that was previously queued for send 2018 */ 2019void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) 2020{ 2021 mutex_lock(&con->mutex); 2022 if (!list_empty(&msg->list_head)) { 2023 dout("con_revoke %p msg %p\n", con, msg); 2024 list_del_init(&msg->list_head); 2025 ceph_msg_put(msg); 2026 msg->hdr.seq = 0; 2027 if (con->out_msg == msg) { 2028 ceph_msg_put(con->out_msg); 2029 con->out_msg = NULL; 2030 } 2031 if (con->out_kvec_is_msg) { 2032 con->out_skip = con->out_kvec_bytes; 2033 con->out_kvec_is_msg = false; 2034 } 2035 } else { 2036 dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg); 2037 } 2038 mutex_unlock(&con->mutex); 2039} 2040 2041/* 2042 * Revoke a message that we may be reading data into 2043 */ 2044void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) 2045{ 2046 mutex_lock(&con->mutex); 2047 if (con->in_msg && con->in_msg == msg) { 2048 unsigned front_len = le32_to_cpu(con->in_hdr.front_len); 2049 unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len); 2050 unsigned data_len = le32_to_cpu(con->in_hdr.data_len); 2051 2052 /* skip rest of message */ 2053 dout("con_revoke_pages %p msg %p revoked\n", con, msg); 2054 con->in_base_pos = con->in_base_pos - 2055 sizeof(struct ceph_msg_header) - 2056 front_len - 2057 middle_len - 2058 data_len - 2059 sizeof(struct ceph_msg_footer); 2060 ceph_msg_put(con->in_msg); 2061 con->in_msg = NULL; 2062 con->in_tag = CEPH_MSGR_TAG_READY; 2063 con->in_seq++; 2064 } else { 2065 dout("con_revoke_pages %p msg %p pages %p no-op\n", 2066 con, con->in_msg, msg); 2067 } 2068 mutex_unlock(&con->mutex); 2069} 2070 2071/* 2072 * Queue a keepalive byte to ensure the tcp connection is alive. 2073 */ 2074void ceph_con_keepalive(struct ceph_connection *con) 2075{ 2076 if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && 2077 test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2078 queue_con(con); 2079} 2080 2081 2082/* 2083 * construct a new message with given type, size 2084 * the new msg has a ref count of 1. 2085 */ 2086struct ceph_msg *ceph_msg_new(int type, int front_len, 2087 int page_len, int page_off, struct page **pages) 2088{ 2089 struct ceph_msg *m; 2090 2091 m = kmalloc(sizeof(*m), GFP_NOFS); 2092 if (m == NULL) 2093 goto out; 2094 kref_init(&m->kref); 2095 INIT_LIST_HEAD(&m->list_head); 2096 2097 m->hdr.tid = 0; 2098 m->hdr.type = cpu_to_le16(type); 2099 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); 2100 m->hdr.version = 0; 2101 m->hdr.front_len = cpu_to_le32(front_len); 2102 m->hdr.middle_len = 0; 2103 m->hdr.data_len = cpu_to_le32(page_len); 2104 m->hdr.data_off = cpu_to_le16(page_off); 2105 m->hdr.reserved = 0; 2106 m->footer.front_crc = 0; 2107 m->footer.middle_crc = 0; 2108 m->footer.data_crc = 0; 2109 m->footer.flags = 0; 2110 m->front_max = front_len; 2111 m->front_is_vmalloc = false; 2112 m->more_to_follow = false; 2113 m->pool = NULL; 2114 2115 /* front */ 2116 if (front_len) { 2117 if (front_len > PAGE_CACHE_SIZE) { 2118 m->front.iov_base = __vmalloc(front_len, GFP_NOFS, 2119 PAGE_KERNEL); 2120 m->front_is_vmalloc = true; 2121 } else { 2122 m->front.iov_base = kmalloc(front_len, GFP_NOFS); 2123 } 2124 if (m->front.iov_base == NULL) { 2125 pr_err("msg_new can't allocate %d bytes\n", 2126 front_len); 2127 goto out2; 2128 } 2129 } else { 2130 m->front.iov_base = NULL; 2131 } 2132 m->front.iov_len = front_len; 2133 2134 /* middle */ 2135 m->middle = NULL; 2136 2137 /* data */ 2138 m->nr_pages = calc_pages_for(page_off, page_len); 2139 m->pages = pages; 2140 m->pagelist = NULL; 2141 2142 dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len, 2143 m->nr_pages); 2144 return m; 2145 2146out2: 2147 ceph_msg_put(m); 2148out: 2149 pr_err("msg_new can't create type %d len %d\n", type, front_len); 2150 return ERR_PTR(-ENOMEM); 2151} 2152 2153/* 2154 * Allocate "middle" portion of a message, if it is needed and wasn't 2155 * allocated by alloc_msg. This allows us to read a small fixed-size 2156 * per-type header in the front and then gracefully fail (i.e., 2157 * propagate the error to the caller based on info in the front) when 2158 * the middle is too large. 2159 */ 2160static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) 2161{ 2162 int type = le16_to_cpu(msg->hdr.type); 2163 int middle_len = le32_to_cpu(msg->hdr.middle_len); 2164 2165 dout("alloc_middle %p type %d %s middle_len %d\n", msg, type, 2166 ceph_msg_type_name(type), middle_len); 2167 BUG_ON(!middle_len); 2168 BUG_ON(msg->middle); 2169 2170 msg->middle = ceph_buffer_new(middle_len, GFP_NOFS); 2171 if (!msg->middle) 2172 return -ENOMEM; 2173 return 0; 2174} 2175 2176/* 2177 * Generic message allocator, for incoming messages. 2178 */ 2179static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 2180 struct ceph_msg_header *hdr, 2181 int *skip) 2182{ 2183 int type = le16_to_cpu(hdr->type); 2184 int front_len = le32_to_cpu(hdr->front_len); 2185 int middle_len = le32_to_cpu(hdr->middle_len); 2186 struct ceph_msg *msg = NULL; 2187 int ret; 2188 2189 if (con->ops->alloc_msg) { 2190 mutex_unlock(&con->mutex); 2191 msg = con->ops->alloc_msg(con, hdr, skip); 2192 mutex_lock(&con->mutex); 2193 if (IS_ERR(msg)) 2194 return msg; 2195 2196 if (*skip) 2197 return NULL; 2198 } 2199 if (!msg) { 2200 *skip = 0; 2201 msg = ceph_msg_new(type, front_len, 0, 0, NULL); 2202 if (!msg) { 2203 pr_err("unable to allocate msg type %d len %d\n", 2204 type, front_len); 2205 return ERR_PTR(-ENOMEM); 2206 } 2207 } 2208 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2209 2210 if (middle_len) { 2211 ret = ceph_alloc_middle(con, msg); 2212 2213 if (ret < 0) { 2214 ceph_msg_put(msg); 2215 return msg; 2216 } 2217 } 2218 2219 return msg; 2220} 2221 2222 2223/* 2224 * Free a generically kmalloc'd message. 2225 */ 2226void ceph_msg_kfree(struct ceph_msg *m) 2227{ 2228 dout("msg_kfree %p\n", m); 2229 if (m->front_is_vmalloc) 2230 vfree(m->front.iov_base); 2231 else 2232 kfree(m->front.iov_base); 2233 kfree(m); 2234} 2235 2236/* 2237 * Drop a msg ref. Destroy as needed. 2238 */ 2239void ceph_msg_last_put(struct kref *kref) 2240{ 2241 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 2242 2243 dout("ceph_msg_put last one on %p\n", m); 2244 WARN_ON(!list_empty(&m->list_head)); 2245 2246 /* drop middle, data, if any */ 2247 if (m->middle) { 2248 ceph_buffer_put(m->middle); 2249 m->middle = NULL; 2250 } 2251 m->nr_pages = 0; 2252 m->pages = NULL; 2253 2254 if (m->pagelist) { 2255 ceph_pagelist_release(m->pagelist); 2256 kfree(m->pagelist); 2257 m->pagelist = NULL; 2258 } 2259 2260 if (m->pool) 2261 ceph_msgpool_put(m->pool, m); 2262 else 2263 ceph_msg_kfree(m); 2264} 2265 2266void ceph_msg_dump(struct ceph_msg *msg) 2267{ 2268 pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg, 2269 msg->front_max, msg->nr_pages); 2270 print_hex_dump(KERN_DEBUG, "header: ", 2271 DUMP_PREFIX_OFFSET, 16, 1, 2272 &msg->hdr, sizeof(msg->hdr), true); 2273 print_hex_dump(KERN_DEBUG, " front: ", 2274 DUMP_PREFIX_OFFSET, 16, 1, 2275 msg->front.iov_base, msg->front.iov_len, true); 2276 if (msg->middle) 2277 print_hex_dump(KERN_DEBUG, "middle: ", 2278 DUMP_PREFIX_OFFSET, 16, 1, 2279 msg->middle->vec.iov_base, 2280 msg->middle->vec.iov_len, true); 2281 print_hex_dump(KERN_DEBUG, "footer: ", 2282 DUMP_PREFIX_OFFSET, 16, 1, 2283 &msg->footer, sizeof(msg->footer), true); 2284}