Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v2.6.35-rc2 879 lines 22 kB view raw
1#include "ceph_debug.h" 2 3#include <linux/types.h> 4#include <linux/slab.h> 5#include <linux/random.h> 6#include <linux/sched.h> 7 8#include "mon_client.h" 9#include "super.h" 10#include "auth.h" 11#include "decode.h" 12 13/* 14 * Interact with Ceph monitor cluster. Handle requests for new map 15 * versions, and periodically resend as needed. Also implement 16 * statfs() and umount(). 17 * 18 * A small cluster of Ceph "monitors" are responsible for managing critical 19 * cluster configuration and state information. An odd number (e.g., 3, 5) 20 * of cmon daemons use a modified version of the Paxos part-time parliament 21 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 22 * list of clients who have mounted the file system. 23 * 24 * We maintain an open, active session with a monitor at all times in order to 25 * receive timely MDSMap updates. We periodically send a keepalive byte on the 26 * TCP socket to ensure we detect a failure. If the connection does break, we 27 * randomly hunt for a new monitor. Once the connection is reestablished, we 28 * resend any outstanding requests. 29 */ 30 31static const struct ceph_connection_operations mon_con_ops; 32 33static int __validate_auth(struct ceph_mon_client *monc); 34 35/* 36 * Decode a monmap blob (e.g., during mount). 37 */ 38struct ceph_monmap *ceph_monmap_decode(void *p, void *end) 39{ 40 struct ceph_monmap *m = NULL; 41 int i, err = -EINVAL; 42 struct ceph_fsid fsid; 43 u32 epoch, num_mon; 44 u16 version; 45 u32 len; 46 47 ceph_decode_32_safe(&p, end, len, bad); 48 ceph_decode_need(&p, end, len, bad); 49 50 dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p)); 51 52 ceph_decode_16_safe(&p, end, version, bad); 53 54 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); 55 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 56 epoch = ceph_decode_32(&p); 57 58 num_mon = ceph_decode_32(&p); 59 ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); 60 61 if (num_mon >= CEPH_MAX_MON) 62 goto bad; 63 m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS); 64 if (m == NULL) 65 return ERR_PTR(-ENOMEM); 66 m->fsid = fsid; 67 m->epoch = epoch; 68 m->num_mon = num_mon; 69 ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0])); 70 for (i = 0; i < num_mon; i++) 71 ceph_decode_addr(&m->mon_inst[i].addr); 72 73 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, 74 m->num_mon); 75 for (i = 0; i < m->num_mon; i++) 76 dout("monmap_decode mon%d is %s\n", i, 77 pr_addr(&m->mon_inst[i].addr.in_addr)); 78 return m; 79 80bad: 81 dout("monmap_decode failed with %d\n", err); 82 kfree(m); 83 return ERR_PTR(err); 84} 85 86/* 87 * return true if *addr is included in the monmap. 88 */ 89int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 90{ 91 int i; 92 93 for (i = 0; i < m->num_mon; i++) 94 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0) 95 return 1; 96 return 0; 97} 98 99/* 100 * Send an auth request. 101 */ 102static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 103{ 104 monc->pending_auth = 1; 105 monc->m_auth->front.iov_len = len; 106 monc->m_auth->hdr.front_len = cpu_to_le32(len); 107 ceph_con_revoke(monc->con, monc->m_auth); 108 ceph_msg_get(monc->m_auth); /* keep our ref */ 109 ceph_con_send(monc->con, monc->m_auth); 110} 111 112/* 113 * Close monitor session, if any. 114 */ 115static void __close_session(struct ceph_mon_client *monc) 116{ 117 if (monc->con) { 118 dout("__close_session closing mon%d\n", monc->cur_mon); 119 ceph_con_revoke(monc->con, monc->m_auth); 120 ceph_con_close(monc->con); 121 monc->cur_mon = -1; 122 monc->pending_auth = 0; 123 ceph_auth_reset(monc->auth); 124 } 125} 126 127/* 128 * Open a session with a (new) monitor. 129 */ 130static int __open_session(struct ceph_mon_client *monc) 131{ 132 char r; 133 int ret; 134 135 if (monc->cur_mon < 0) { 136 get_random_bytes(&r, 1); 137 monc->cur_mon = r % monc->monmap->num_mon; 138 dout("open_session num=%d r=%d -> mon%d\n", 139 monc->monmap->num_mon, r, monc->cur_mon); 140 monc->sub_sent = 0; 141 monc->sub_renew_after = jiffies; /* i.e., expired */ 142 monc->want_next_osdmap = !!monc->want_next_osdmap; 143 144 dout("open_session mon%d opening\n", monc->cur_mon); 145 monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON; 146 monc->con->peer_name.num = cpu_to_le64(monc->cur_mon); 147 ceph_con_open(monc->con, 148 &monc->monmap->mon_inst[monc->cur_mon].addr); 149 150 /* initiatiate authentication handshake */ 151 ret = ceph_auth_build_hello(monc->auth, 152 monc->m_auth->front.iov_base, 153 monc->m_auth->front_max); 154 __send_prepared_auth_request(monc, ret); 155 } else { 156 dout("open_session mon%d already open\n", monc->cur_mon); 157 } 158 return 0; 159} 160 161static bool __sub_expired(struct ceph_mon_client *monc) 162{ 163 return time_after_eq(jiffies, monc->sub_renew_after); 164} 165 166/* 167 * Reschedule delayed work timer. 168 */ 169static void __schedule_delayed(struct ceph_mon_client *monc) 170{ 171 unsigned delay; 172 173 if (monc->cur_mon < 0 || __sub_expired(monc)) 174 delay = 10 * HZ; 175 else 176 delay = 20 * HZ; 177 dout("__schedule_delayed after %u\n", delay); 178 schedule_delayed_work(&monc->delayed_work, delay); 179} 180 181/* 182 * Send subscribe request for mdsmap and/or osdmap. 183 */ 184static void __send_subscribe(struct ceph_mon_client *monc) 185{ 186 dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n", 187 (unsigned)monc->sub_sent, __sub_expired(monc), 188 monc->want_next_osdmap); 189 if ((__sub_expired(monc) && !monc->sub_sent) || 190 monc->want_next_osdmap == 1) { 191 struct ceph_msg *msg = monc->m_subscribe; 192 struct ceph_mon_subscribe_item *i; 193 void *p, *end; 194 195 p = msg->front.iov_base; 196 end = p + msg->front_max; 197 198 dout("__send_subscribe to 'mdsmap' %u+\n", 199 (unsigned)monc->have_mdsmap); 200 if (monc->want_next_osdmap) { 201 dout("__send_subscribe to 'osdmap' %u\n", 202 (unsigned)monc->have_osdmap); 203 ceph_encode_32(&p, 3); 204 ceph_encode_string(&p, end, "osdmap", 6); 205 i = p; 206 i->have = cpu_to_le64(monc->have_osdmap); 207 i->onetime = 1; 208 p += sizeof(*i); 209 monc->want_next_osdmap = 2; /* requested */ 210 } else { 211 ceph_encode_32(&p, 2); 212 } 213 ceph_encode_string(&p, end, "mdsmap", 6); 214 i = p; 215 i->have = cpu_to_le64(monc->have_mdsmap); 216 i->onetime = 0; 217 p += sizeof(*i); 218 ceph_encode_string(&p, end, "monmap", 6); 219 i = p; 220 i->have = 0; 221 i->onetime = 0; 222 p += sizeof(*i); 223 224 msg->front.iov_len = p - msg->front.iov_base; 225 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 226 ceph_con_revoke(monc->con, msg); 227 ceph_con_send(monc->con, ceph_msg_get(msg)); 228 229 monc->sub_sent = jiffies | 1; /* never 0 */ 230 } 231} 232 233static void handle_subscribe_ack(struct ceph_mon_client *monc, 234 struct ceph_msg *msg) 235{ 236 unsigned seconds; 237 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 238 239 if (msg->front.iov_len < sizeof(*h)) 240 goto bad; 241 seconds = le32_to_cpu(h->duration); 242 243 mutex_lock(&monc->mutex); 244 if (monc->hunting) { 245 pr_info("mon%d %s session established\n", 246 monc->cur_mon, pr_addr(&monc->con->peer_addr.in_addr)); 247 monc->hunting = false; 248 } 249 dout("handle_subscribe_ack after %d seconds\n", seconds); 250 monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1; 251 monc->sub_sent = 0; 252 mutex_unlock(&monc->mutex); 253 return; 254bad: 255 pr_err("got corrupt subscribe-ack msg\n"); 256 ceph_msg_dump(msg); 257} 258 259/* 260 * Keep track of which maps we have 261 */ 262int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) 263{ 264 mutex_lock(&monc->mutex); 265 monc->have_mdsmap = got; 266 mutex_unlock(&monc->mutex); 267 return 0; 268} 269 270int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) 271{ 272 mutex_lock(&monc->mutex); 273 monc->have_osdmap = got; 274 monc->want_next_osdmap = 0; 275 mutex_unlock(&monc->mutex); 276 return 0; 277} 278 279/* 280 * Register interest in the next osdmap 281 */ 282void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) 283{ 284 dout("request_next_osdmap have %u\n", monc->have_osdmap); 285 mutex_lock(&monc->mutex); 286 if (!monc->want_next_osdmap) 287 monc->want_next_osdmap = 1; 288 if (monc->want_next_osdmap < 2) 289 __send_subscribe(monc); 290 mutex_unlock(&monc->mutex); 291} 292 293/* 294 * 295 */ 296int ceph_monc_open_session(struct ceph_mon_client *monc) 297{ 298 if (!monc->con) { 299 monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL); 300 if (!monc->con) 301 return -ENOMEM; 302 ceph_con_init(monc->client->msgr, monc->con); 303 monc->con->private = monc; 304 monc->con->ops = &mon_con_ops; 305 } 306 307 mutex_lock(&monc->mutex); 308 __open_session(monc); 309 __schedule_delayed(monc); 310 mutex_unlock(&monc->mutex); 311 return 0; 312} 313 314/* 315 * The monitor responds with mount ack indicate mount success. The 316 * included client ticket allows the client to talk to MDSs and OSDs. 317 */ 318static void ceph_monc_handle_map(struct ceph_mon_client *monc, 319 struct ceph_msg *msg) 320{ 321 struct ceph_client *client = monc->client; 322 struct ceph_monmap *monmap = NULL, *old = monc->monmap; 323 void *p, *end; 324 325 mutex_lock(&monc->mutex); 326 327 dout("handle_monmap\n"); 328 p = msg->front.iov_base; 329 end = p + msg->front.iov_len; 330 331 monmap = ceph_monmap_decode(p, end); 332 if (IS_ERR(monmap)) { 333 pr_err("problem decoding monmap, %d\n", 334 (int)PTR_ERR(monmap)); 335 goto out; 336 } 337 338 if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) { 339 kfree(monmap); 340 goto out; 341 } 342 343 client->monc.monmap = monmap; 344 kfree(old); 345 346out: 347 mutex_unlock(&monc->mutex); 348 wake_up(&client->auth_wq); 349} 350 351/* 352 * statfs 353 */ 354static struct ceph_mon_generic_request *__lookup_generic_req( 355 struct ceph_mon_client *monc, u64 tid) 356{ 357 struct ceph_mon_generic_request *req; 358 struct rb_node *n = monc->generic_request_tree.rb_node; 359 360 while (n) { 361 req = rb_entry(n, struct ceph_mon_generic_request, node); 362 if (tid < req->tid) 363 n = n->rb_left; 364 else if (tid > req->tid) 365 n = n->rb_right; 366 else 367 return req; 368 } 369 return NULL; 370} 371 372static void __insert_generic_request(struct ceph_mon_client *monc, 373 struct ceph_mon_generic_request *new) 374{ 375 struct rb_node **p = &monc->generic_request_tree.rb_node; 376 struct rb_node *parent = NULL; 377 struct ceph_mon_generic_request *req = NULL; 378 379 while (*p) { 380 parent = *p; 381 req = rb_entry(parent, struct ceph_mon_generic_request, node); 382 if (new->tid < req->tid) 383 p = &(*p)->rb_left; 384 else if (new->tid > req->tid) 385 p = &(*p)->rb_right; 386 else 387 BUG(); 388 } 389 390 rb_link_node(&new->node, parent, p); 391 rb_insert_color(&new->node, &monc->generic_request_tree); 392} 393 394static void release_generic_request(struct kref *kref) 395{ 396 struct ceph_mon_generic_request *req = 397 container_of(kref, struct ceph_mon_generic_request, kref); 398 399 if (req->reply) 400 ceph_msg_put(req->reply); 401 if (req->request) 402 ceph_msg_put(req->request); 403} 404 405static void put_generic_request(struct ceph_mon_generic_request *req) 406{ 407 kref_put(&req->kref, release_generic_request); 408} 409 410static void get_generic_request(struct ceph_mon_generic_request *req) 411{ 412 kref_get(&req->kref); 413} 414 415static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 416 struct ceph_msg_header *hdr, 417 int *skip) 418{ 419 struct ceph_mon_client *monc = con->private; 420 struct ceph_mon_generic_request *req; 421 u64 tid = le64_to_cpu(hdr->tid); 422 struct ceph_msg *m; 423 424 mutex_lock(&monc->mutex); 425 req = __lookup_generic_req(monc, tid); 426 if (!req) { 427 dout("get_generic_reply %lld dne\n", tid); 428 *skip = 1; 429 m = NULL; 430 } else { 431 dout("get_generic_reply %lld got %p\n", tid, req->reply); 432 m = ceph_msg_get(req->reply); 433 /* 434 * we don't need to track the connection reading into 435 * this reply because we only have one open connection 436 * at a time, ever. 437 */ 438 } 439 mutex_unlock(&monc->mutex); 440 return m; 441} 442 443static void handle_statfs_reply(struct ceph_mon_client *monc, 444 struct ceph_msg *msg) 445{ 446 struct ceph_mon_generic_request *req; 447 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 448 u64 tid = le64_to_cpu(msg->hdr.tid); 449 450 if (msg->front.iov_len != sizeof(*reply)) 451 goto bad; 452 dout("handle_statfs_reply %p tid %llu\n", msg, tid); 453 454 mutex_lock(&monc->mutex); 455 req = __lookup_generic_req(monc, tid); 456 if (req) { 457 *(struct ceph_statfs *)req->buf = reply->st; 458 req->result = 0; 459 get_generic_request(req); 460 } 461 mutex_unlock(&monc->mutex); 462 if (req) { 463 complete(&req->completion); 464 put_generic_request(req); 465 } 466 return; 467 468bad: 469 pr_err("corrupt generic reply, no tid\n"); 470 ceph_msg_dump(msg); 471} 472 473/* 474 * Do a synchronous statfs(). 475 */ 476int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) 477{ 478 struct ceph_mon_generic_request *req; 479 struct ceph_mon_statfs *h; 480 int err; 481 482 req = kzalloc(sizeof(*req), GFP_NOFS); 483 if (!req) 484 return -ENOMEM; 485 486 kref_init(&req->kref); 487 req->buf = buf; 488 init_completion(&req->completion); 489 490 err = -ENOMEM; 491 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS); 492 if (!req->request) 493 goto out; 494 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS); 495 if (!req->reply) 496 goto out; 497 498 /* fill out request */ 499 h = req->request->front.iov_base; 500 h->monhdr.have_version = 0; 501 h->monhdr.session_mon = cpu_to_le16(-1); 502 h->monhdr.session_mon_tid = 0; 503 h->fsid = monc->monmap->fsid; 504 505 /* register request */ 506 mutex_lock(&monc->mutex); 507 req->tid = ++monc->last_tid; 508 req->request->hdr.tid = cpu_to_le64(req->tid); 509 __insert_generic_request(monc, req); 510 monc->num_generic_requests++; 511 mutex_unlock(&monc->mutex); 512 513 /* send request and wait */ 514 ceph_con_send(monc->con, ceph_msg_get(req->request)); 515 err = wait_for_completion_interruptible(&req->completion); 516 517 mutex_lock(&monc->mutex); 518 rb_erase(&req->node, &monc->generic_request_tree); 519 monc->num_generic_requests--; 520 mutex_unlock(&monc->mutex); 521 522 if (!err) 523 err = req->result; 524 525out: 526 kref_put(&req->kref, release_generic_request); 527 return err; 528} 529 530/* 531 * Resend pending statfs requests. 532 */ 533static void __resend_generic_request(struct ceph_mon_client *monc) 534{ 535 struct ceph_mon_generic_request *req; 536 struct rb_node *p; 537 538 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 539 req = rb_entry(p, struct ceph_mon_generic_request, node); 540 ceph_con_revoke(monc->con, req->request); 541 ceph_con_send(monc->con, ceph_msg_get(req->request)); 542 } 543} 544 545/* 546 * Delayed work. If we haven't mounted yet, retry. Otherwise, 547 * renew/retry subscription as needed (in case it is timing out, or we 548 * got an ENOMEM). And keep the monitor connection alive. 549 */ 550static void delayed_work(struct work_struct *work) 551{ 552 struct ceph_mon_client *monc = 553 container_of(work, struct ceph_mon_client, delayed_work.work); 554 555 dout("monc delayed_work\n"); 556 mutex_lock(&monc->mutex); 557 if (monc->hunting) { 558 __close_session(monc); 559 __open_session(monc); /* continue hunting */ 560 } else { 561 ceph_con_keepalive(monc->con); 562 563 __validate_auth(monc); 564 565 if (monc->auth->ops->is_authenticated(monc->auth)) 566 __send_subscribe(monc); 567 } 568 __schedule_delayed(monc); 569 mutex_unlock(&monc->mutex); 570} 571 572/* 573 * On startup, we build a temporary monmap populated with the IPs 574 * provided by mount(2). 575 */ 576static int build_initial_monmap(struct ceph_mon_client *monc) 577{ 578 struct ceph_mount_args *args = monc->client->mount_args; 579 struct ceph_entity_addr *mon_addr = args->mon_addr; 580 int num_mon = args->num_mon; 581 int i; 582 583 /* build initial monmap */ 584 monc->monmap = kzalloc(sizeof(*monc->monmap) + 585 num_mon*sizeof(monc->monmap->mon_inst[0]), 586 GFP_KERNEL); 587 if (!monc->monmap) 588 return -ENOMEM; 589 for (i = 0; i < num_mon; i++) { 590 monc->monmap->mon_inst[i].addr = mon_addr[i]; 591 monc->monmap->mon_inst[i].addr.nonce = 0; 592 monc->monmap->mon_inst[i].name.type = 593 CEPH_ENTITY_TYPE_MON; 594 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 595 } 596 monc->monmap->num_mon = num_mon; 597 monc->have_fsid = false; 598 599 /* release addr memory */ 600 kfree(args->mon_addr); 601 args->mon_addr = NULL; 602 args->num_mon = 0; 603 return 0; 604} 605 606int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 607{ 608 int err = 0; 609 610 dout("init\n"); 611 memset(monc, 0, sizeof(*monc)); 612 monc->client = cl; 613 monc->monmap = NULL; 614 mutex_init(&monc->mutex); 615 616 err = build_initial_monmap(monc); 617 if (err) 618 goto out; 619 620 monc->con = NULL; 621 622 /* authentication */ 623 monc->auth = ceph_auth_init(cl->mount_args->name, 624 cl->mount_args->secret); 625 if (IS_ERR(monc->auth)) 626 return PTR_ERR(monc->auth); 627 monc->auth->want_keys = 628 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 629 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 630 631 /* msgs */ 632 err = -ENOMEM; 633 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 634 sizeof(struct ceph_mon_subscribe_ack), 635 GFP_NOFS); 636 if (!monc->m_subscribe_ack) 637 goto out_monmap; 638 639 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS); 640 if (!monc->m_subscribe) 641 goto out_subscribe_ack; 642 643 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS); 644 if (!monc->m_auth_reply) 645 goto out_subscribe; 646 647 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS); 648 monc->pending_auth = 0; 649 if (!monc->m_auth) 650 goto out_auth_reply; 651 652 monc->cur_mon = -1; 653 monc->hunting = true; 654 monc->sub_renew_after = jiffies; 655 monc->sub_sent = 0; 656 657 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 658 monc->generic_request_tree = RB_ROOT; 659 monc->num_generic_requests = 0; 660 monc->last_tid = 0; 661 662 monc->have_mdsmap = 0; 663 monc->have_osdmap = 0; 664 monc->want_next_osdmap = 1; 665 return 0; 666 667out_auth_reply: 668 ceph_msg_put(monc->m_auth_reply); 669out_subscribe: 670 ceph_msg_put(monc->m_subscribe); 671out_subscribe_ack: 672 ceph_msg_put(monc->m_subscribe_ack); 673out_monmap: 674 kfree(monc->monmap); 675out: 676 return err; 677} 678 679void ceph_monc_stop(struct ceph_mon_client *monc) 680{ 681 dout("stop\n"); 682 cancel_delayed_work_sync(&monc->delayed_work); 683 684 mutex_lock(&monc->mutex); 685 __close_session(monc); 686 if (monc->con) { 687 monc->con->private = NULL; 688 monc->con->ops->put(monc->con); 689 monc->con = NULL; 690 } 691 mutex_unlock(&monc->mutex); 692 693 ceph_auth_destroy(monc->auth); 694 695 ceph_msg_put(monc->m_auth); 696 ceph_msg_put(monc->m_auth_reply); 697 ceph_msg_put(monc->m_subscribe); 698 ceph_msg_put(monc->m_subscribe_ack); 699 700 kfree(monc->monmap); 701} 702 703static void handle_auth_reply(struct ceph_mon_client *monc, 704 struct ceph_msg *msg) 705{ 706 int ret; 707 int was_auth = 0; 708 709 mutex_lock(&monc->mutex); 710 if (monc->auth->ops) 711 was_auth = monc->auth->ops->is_authenticated(monc->auth); 712 monc->pending_auth = 0; 713 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 714 msg->front.iov_len, 715 monc->m_auth->front.iov_base, 716 monc->m_auth->front_max); 717 if (ret < 0) { 718 monc->client->auth_err = ret; 719 wake_up(&monc->client->auth_wq); 720 } else if (ret > 0) { 721 __send_prepared_auth_request(monc, ret); 722 } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) { 723 dout("authenticated, starting session\n"); 724 725 monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 726 monc->client->msgr->inst.name.num = monc->auth->global_id; 727 728 __send_subscribe(monc); 729 __resend_generic_request(monc); 730 } 731 mutex_unlock(&monc->mutex); 732} 733 734static int __validate_auth(struct ceph_mon_client *monc) 735{ 736 int ret; 737 738 if (monc->pending_auth) 739 return 0; 740 741 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 742 monc->m_auth->front_max); 743 if (ret <= 0) 744 return ret; /* either an error, or no need to authenticate */ 745 __send_prepared_auth_request(monc, ret); 746 return 0; 747} 748 749int ceph_monc_validate_auth(struct ceph_mon_client *monc) 750{ 751 int ret; 752 753 mutex_lock(&monc->mutex); 754 ret = __validate_auth(monc); 755 mutex_unlock(&monc->mutex); 756 return ret; 757} 758 759/* 760 * handle incoming message 761 */ 762static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 763{ 764 struct ceph_mon_client *monc = con->private; 765 int type = le16_to_cpu(msg->hdr.type); 766 767 if (!monc) 768 return; 769 770 switch (type) { 771 case CEPH_MSG_AUTH_REPLY: 772 handle_auth_reply(monc, msg); 773 break; 774 775 case CEPH_MSG_MON_SUBSCRIBE_ACK: 776 handle_subscribe_ack(monc, msg); 777 break; 778 779 case CEPH_MSG_STATFS_REPLY: 780 handle_statfs_reply(monc, msg); 781 break; 782 783 case CEPH_MSG_MON_MAP: 784 ceph_monc_handle_map(monc, msg); 785 break; 786 787 case CEPH_MSG_MDS_MAP: 788 ceph_mdsc_handle_map(&monc->client->mdsc, msg); 789 break; 790 791 case CEPH_MSG_OSD_MAP: 792 ceph_osdc_handle_map(&monc->client->osdc, msg); 793 break; 794 795 default: 796 pr_err("received unknown message type %d %s\n", type, 797 ceph_msg_type_name(type)); 798 } 799 ceph_msg_put(msg); 800} 801 802/* 803 * Allocate memory for incoming message 804 */ 805static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 806 struct ceph_msg_header *hdr, 807 int *skip) 808{ 809 struct ceph_mon_client *monc = con->private; 810 int type = le16_to_cpu(hdr->type); 811 int front_len = le32_to_cpu(hdr->front_len); 812 struct ceph_msg *m = NULL; 813 814 *skip = 0; 815 816 switch (type) { 817 case CEPH_MSG_MON_SUBSCRIBE_ACK: 818 m = ceph_msg_get(monc->m_subscribe_ack); 819 break; 820 case CEPH_MSG_STATFS_REPLY: 821 return get_generic_reply(con, hdr, skip); 822 case CEPH_MSG_AUTH_REPLY: 823 m = ceph_msg_get(monc->m_auth_reply); 824 break; 825 case CEPH_MSG_MON_MAP: 826 case CEPH_MSG_MDS_MAP: 827 case CEPH_MSG_OSD_MAP: 828 m = ceph_msg_new(type, front_len, GFP_NOFS); 829 break; 830 } 831 832 if (!m) { 833 pr_info("alloc_msg unknown type %d\n", type); 834 *skip = 1; 835 } 836 return m; 837} 838 839/* 840 * If the monitor connection resets, pick a new monitor and resubmit 841 * any pending requests. 842 */ 843static void mon_fault(struct ceph_connection *con) 844{ 845 struct ceph_mon_client *monc = con->private; 846 847 if (!monc) 848 return; 849 850 dout("mon_fault\n"); 851 mutex_lock(&monc->mutex); 852 if (!con->private) 853 goto out; 854 855 if (monc->con && !monc->hunting) 856 pr_info("mon%d %s session lost, " 857 "hunting for new mon\n", monc->cur_mon, 858 pr_addr(&monc->con->peer_addr.in_addr)); 859 860 __close_session(monc); 861 if (!monc->hunting) { 862 /* start hunting */ 863 monc->hunting = true; 864 __open_session(monc); 865 } else { 866 /* already hunting, let's wait a bit */ 867 __schedule_delayed(monc); 868 } 869out: 870 mutex_unlock(&monc->mutex); 871} 872 873static const struct ceph_connection_operations mon_con_ops = { 874 .get = ceph_con_get, 875 .put = ceph_con_put, 876 .dispatch = dispatch, 877 .fault = mon_fault, 878 .alloc_msg = mon_alloc_msg, 879};