Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
ceph: preserve seq # on requeued messages after transient transport errors
ceph: fix cap removal races
ceph: zero unused message header, footer fields
ceph: fix locking for waking session requests after reconnect
ceph: resubmit requests on pg mapping change (not just primary change)
ceph: fix open file counting on snapped inodes when mds returns no caps
ceph: unregister osd request on failure
ceph: don't use writeback_control in writepages completion
ceph: unregister bdi before kill_anon_super releases device name

+116 -49
-6
fs/ceph/addr.c
··· 504 504 int i; 505 505 struct ceph_snap_context *snapc = req->r_snapc; 506 506 struct address_space *mapping = inode->i_mapping; 507 - struct writeback_control *wbc = req->r_wbc; 508 507 __s32 rc = -EIO; 509 508 u64 bytes = 0; 510 509 struct ceph_client *client = ceph_inode_to_client(inode); ··· 545 546 clear_bdi_congested(&client->backing_dev_info, 546 547 BLK_RW_ASYNC); 547 548 548 - if (i >= wrote) { 549 - dout("inode %p skipping page %p\n", inode, page); 550 - wbc->pages_skipped++; 551 - } 552 549 ceph_put_snap_context((void *)page->private); 553 550 page->private = 0; 554 551 ClearPagePrivate(page); ··· 794 799 alloc_page_vec(client, req); 795 800 req->r_callback = writepages_finish; 796 801 req->r_inode = inode; 797 - req->r_wbc = wbc; 798 802 } 799 803 800 804 /* note position of first page in pvec */
+12 -7
fs/ceph/caps.c
··· 858 858 } 859 859 860 860 /* 861 + * Remove a cap. Take steps to deal with a racing iterate_session_caps. 862 + * 861 863 * caller should hold i_lock. 862 864 * caller will not hold session s_mutex if called from destroy_inode. 863 865 */ ··· 868 866 struct ceph_mds_session *session = cap->session; 869 867 struct ceph_inode_info *ci = cap->ci; 870 868 struct ceph_mds_client *mdsc = &ceph_client(ci->vfs_inode.i_sb)->mdsc; 869 + int removed = 0; 871 870 872 871 dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode); 873 - 874 - /* remove from inode list */ 875 - rb_erase(&cap->ci_node, &ci->i_caps); 876 - cap->ci = NULL; 877 - if (ci->i_auth_cap == cap) 878 - ci->i_auth_cap = NULL; 879 872 880 873 /* remove from session list */ 881 874 spin_lock(&session->s_cap_lock); ··· 882 885 list_del_init(&cap->session_caps); 883 886 session->s_nr_caps--; 884 887 cap->session = NULL; 888 + removed = 1; 885 889 } 890 + /* protect backpointer with s_cap_lock: see iterate_session_caps */ 891 + cap->ci = NULL; 886 892 spin_unlock(&session->s_cap_lock); 887 893 888 - if (cap->session == NULL) 894 + /* remove from inode list */ 895 + rb_erase(&cap->ci_node, &ci->i_caps); 896 + if (ci->i_auth_cap == cap) 897 + ci->i_auth_cap = NULL; 898 + 899 + if (removed) 889 900 ceph_put_cap(cap); 890 901 891 902 if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
+4
fs/ceph/inode.c
··· 733 733 __ceph_get_fmode(ci, cap_fmode); 734 734 spin_unlock(&inode->i_lock); 735 735 } 736 + } else if (cap_fmode >= 0) { 737 + pr_warning("mds issued no caps on %llx.%llx\n", 738 + ceph_vinop(inode)); 739 + __ceph_get_fmode(ci, cap_fmode); 736 740 } 737 741 738 742 /* update delegation info? */
+19 -15
fs/ceph/mds_client.c
··· 736 736 } 737 737 738 738 /* 739 - * Helper to safely iterate over all caps associated with a session. 739 + * Helper to safely iterate over all caps associated with a session, with 740 + * special care taken to handle a racing __ceph_remove_cap(). 740 741 * 741 - * caller must hold session s_mutex 742 + * Caller must hold session s_mutex. 742 743 */ 743 744 static int iterate_session_caps(struct ceph_mds_session *session, 744 745 int (*cb)(struct inode *, struct ceph_cap *, ··· 2137 2136 struct ceph_mds_session *session = NULL; 2138 2137 struct ceph_msg *reply; 2139 2138 struct rb_node *p; 2140 - int err; 2139 + int err = -ENOMEM; 2141 2140 struct ceph_pagelist *pagelist; 2142 2141 2143 2142 pr_info("reconnect to recovering mds%d\n", mds); ··· 2186 2185 goto fail; 2187 2186 err = iterate_session_caps(session, encode_caps_cb, pagelist); 2188 2187 if (err < 0) 2189 - goto out; 2188 + goto fail; 2190 2189 2191 2190 /* 2192 2191 * snaprealms. we provide mds with the ino, seq (version), and ··· 2214 2213 reply->nr_pages = calc_pages_for(0, pagelist->length); 2215 2214 ceph_con_send(&session->s_con, reply); 2216 2215 2217 - if (session) { 2218 - session->s_state = CEPH_MDS_SESSION_OPEN; 2219 - __wake_requests(mdsc, &session->s_waiting); 2220 - } 2216 + session->s_state = CEPH_MDS_SESSION_OPEN; 2217 + mutex_unlock(&session->s_mutex); 2221 2218 2222 - out: 2219 + mutex_lock(&mdsc->mutex); 2220 + __wake_requests(mdsc, &session->s_waiting); 2221 + mutex_unlock(&mdsc->mutex); 2222 + 2223 + ceph_put_mds_session(session); 2224 + 2223 2225 up_read(&mdsc->snap_rwsem); 2224 - if (session) { 2225 - mutex_unlock(&session->s_mutex); 2226 - ceph_put_mds_session(session); 2227 - } 2228 2226 mutex_lock(&mdsc->mutex); 2229 2227 return; 2230 2228 2231 2229 fail: 2232 2230 ceph_msg_put(reply); 2231 + up_read(&mdsc->snap_rwsem); 2232 + mutex_unlock(&session->s_mutex); 2233 + ceph_put_mds_session(session); 2233 2234 fail_nomsg: 2234 2235 ceph_pagelist_release(pagelist); 2235 2236 kfree(pagelist); 2236 2237 fail_nopagelist: 2237 - pr_err("ENOMEM preparing reconnect for mds%d\n", mds); 2238 - goto out; 2238 + pr_err("error %d preparing reconnect for mds%d\n", err, mds); 2239 + mutex_lock(&mdsc->mutex); 2240 + return; 2239 2241 } 2240 2242 2241 2243
+15 -2
fs/ceph/messenger.c
··· 492 492 list_move_tail(&m->list_head, &con->out_sent); 493 493 } 494 494 495 - m->hdr.seq = cpu_to_le64(++con->out_seq); 495 + /* 496 + * only assign outgoing seq # if we haven't sent this message 497 + * yet. if it is requeued, resend with it's original seq. 498 + */ 499 + if (m->needs_out_seq) { 500 + m->hdr.seq = cpu_to_le64(++con->out_seq); 501 + m->needs_out_seq = false; 502 + } 496 503 497 504 dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", 498 505 m, con->out_seq, le16_to_cpu(m->hdr.type), ··· 1993 1986 1994 1987 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 1995 1988 1989 + msg->needs_out_seq = true; 1990 + 1996 1991 /* queue */ 1997 1992 mutex_lock(&con->mutex); 1998 1993 BUG_ON(!list_empty(&msg->list_head)); ··· 2094 2085 kref_init(&m->kref); 2095 2086 INIT_LIST_HEAD(&m->list_head); 2096 2087 2088 + m->hdr.tid = 0; 2097 2089 m->hdr.type = cpu_to_le16(type); 2090 + m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); 2091 + m->hdr.version = 0; 2098 2092 m->hdr.front_len = cpu_to_le32(front_len); 2099 2093 m->hdr.middle_len = 0; 2100 2094 m->hdr.data_len = cpu_to_le32(page_len); 2101 2095 m->hdr.data_off = cpu_to_le16(page_off); 2102 - m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); 2096 + m->hdr.reserved = 0; 2103 2097 m->footer.front_crc = 0; 2104 2098 m->footer.middle_crc = 0; 2105 2099 m->footer.data_crc = 0; 2100 + m->footer.flags = 0; 2106 2101 m->front_max = front_len; 2107 2102 m->front_is_vmalloc = false; 2108 2103 m->more_to_follow = false;
+1
fs/ceph/messenger.h
··· 86 86 struct kref kref; 87 87 bool front_is_vmalloc; 88 88 bool more_to_follow; 89 + bool needs_out_seq; 89 90 int front_max; 90 91 91 92 struct ceph_msgpool *pool;
+20 -6
fs/ceph/osd_client.c
··· 565 565 { 566 566 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 567 567 struct ceph_pg pgid; 568 - int o = -1; 568 + int acting[CEPH_PG_MAX_SIZE]; 569 + int o = -1, num = 0; 569 570 int err; 570 571 571 572 dout("map_osds %p tid %lld\n", req, req->r_tid); ··· 577 576 pgid = reqhead->layout.ol_pgid; 578 577 req->r_pgid = pgid; 579 578 580 - o = ceph_calc_pg_primary(osdc->osdmap, pgid); 579 + err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 580 + if (err > 0) { 581 + o = acting[0]; 582 + num = err; 583 + } 581 584 582 585 if ((req->r_osd && req->r_osd->o_osd == o && 583 - req->r_sent >= req->r_osd->o_incarnation) || 586 + req->r_sent >= req->r_osd->o_incarnation && 587 + req->r_num_pg_osds == num && 588 + memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 584 589 (req->r_osd == NULL && o == -1)) 585 590 return 0; /* no change */ 586 591 587 592 dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n", 588 593 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, 589 594 req->r_osd ? req->r_osd->o_osd : -1); 595 + 596 + /* record full pg acting set */ 597 + memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 598 + req->r_num_pg_osds = num; 590 599 591 600 if (req->r_osd) { 592 601 __cancel_request(req); ··· 623 612 __remove_osd_from_lru(req->r_osd); 624 613 list_add(&req->r_osd_item, &req->r_osd->o_requests); 625 614 } 626 - err = 1; /* osd changed */ 615 + err = 1; /* osd or pg changed */ 627 616 628 617 out: 629 618 return err; ··· 790 779 struct ceph_osd_request *req; 791 780 u64 tid; 792 781 int numops, object_len, flags; 782 + s32 result; 793 783 794 784 tid = le64_to_cpu(msg->hdr.tid); 795 785 if (msg->front.iov_len < sizeof(*rhead)) 796 786 goto bad; 797 787 numops = le32_to_cpu(rhead->num_ops); 798 788 object_len = le32_to_cpu(rhead->object_len); 789 + result = le32_to_cpu(rhead->result); 799 790 if (msg->front.iov_len != sizeof(*rhead) + object_len + 800 791 numops * sizeof(struct ceph_osd_op)) 801 792 goto bad; 802 - dout("handle_reply %p tid %llu\n", msg, tid); 793 + dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 803 794 804 795 /* lookup */ 805 796 mutex_lock(&osdc->request_mutex); ··· 847 834 dout("handle_reply tid %llu flags %d\n", tid, flags); 848 835 849 836 /* either this is a read, or we got the safe response */ 850 - if ((flags & CEPH_OSD_FLAG_ONDISK) || 837 + if (result < 0 || 838 + (flags & CEPH_OSD_FLAG_ONDISK) || 851 839 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 852 840 __unregister_request(osdc, req); 853 841
+2 -1
fs/ceph/osd_client.h
··· 48 48 struct list_head r_osd_item; 49 49 struct ceph_osd *r_osd; 50 50 struct ceph_pg r_pgid; 51 + int r_pg_osds[CEPH_PG_MAX_SIZE]; 52 + int r_num_pg_osds; 51 53 52 54 struct ceph_connection *r_con_filling_msg; 53 55 ··· 68 66 struct list_head r_unsafe_item; 69 67 70 68 struct inode *r_inode; /* for use by callbacks */ 71 - struct writeback_control *r_wbc; /* ditto */ 72 69 73 70 char r_oid[40]; /* object name */ 74 71 int r_oid_len;
+24 -5
fs/ceph/osdmap.c
··· 1041 1041 } 1042 1042 1043 1043 /* 1044 + * Return acting set for given pgid. 1045 + */ 1046 + int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, 1047 + int *acting) 1048 + { 1049 + int rawosds[CEPH_PG_MAX_SIZE], *osds; 1050 + int i, o, num = CEPH_PG_MAX_SIZE; 1051 + 1052 + osds = calc_pg_raw(osdmap, pgid, rawosds, &num); 1053 + if (!osds) 1054 + return -1; 1055 + 1056 + /* primary is first up osd */ 1057 + o = 0; 1058 + for (i = 0; i < num; i++) 1059 + if (ceph_osd_is_up(osdmap, osds[i])) 1060 + acting[o++] = osds[i]; 1061 + return o; 1062 + } 1063 + 1064 + /* 1044 1065 * Return primary osd for given pgid, or -1 if none. 1045 1066 */ 1046 1067 int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid) 1047 1068 { 1048 - int rawosds[10], *osds; 1049 - int i, num = ARRAY_SIZE(rawosds); 1069 + int rawosds[CEPH_PG_MAX_SIZE], *osds; 1070 + int i, num = CEPH_PG_MAX_SIZE; 1050 1071 1051 1072 osds = calc_pg_raw(osdmap, pgid, rawosds, &num); 1052 1073 if (!osds) ··· 1075 1054 1076 1055 /* primary is first up osd */ 1077 1056 for (i = 0; i < num; i++) 1078 - if (ceph_osd_is_up(osdmap, osds[i])) { 1057 + if (ceph_osd_is_up(osdmap, osds[i])) 1079 1058 return osds[i]; 1080 - break; 1081 - } 1082 1059 return -1; 1083 1060 }
+2
fs/ceph/osdmap.h
··· 120 120 const char *oid, 121 121 struct ceph_file_layout *fl, 122 122 struct ceph_osdmap *osdmap); 123 + extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, 124 + int *acting); 123 125 extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, 124 126 struct ceph_pg pgid); 125 127
+1
fs/ceph/rados.h
··· 58 58 #define CEPH_PG_LAYOUT_LINEAR 2 59 59 #define CEPH_PG_LAYOUT_HYBRID 3 60 60 61 + #define CEPH_PG_MAX_SIZE 16 /* max # osds in a single pg */ 61 62 62 63 /* 63 64 * placement group.
+16 -7
fs/ceph/super.c
··· 47 47 */ 48 48 static void ceph_put_super(struct super_block *s) 49 49 { 50 - struct ceph_client *cl = ceph_client(s); 50 + struct ceph_client *client = ceph_sb_to_client(s); 51 51 52 52 dout("put_super\n"); 53 - ceph_mdsc_close_sessions(&cl->mdsc); 53 + ceph_mdsc_close_sessions(&client->mdsc); 54 + 55 + /* 56 + * ensure we release the bdi before put_anon_super releases 57 + * the device name. 58 + */ 59 + if (s->s_bdi == &client->backing_dev_info) { 60 + bdi_unregister(&client->backing_dev_info); 61 + s->s_bdi = NULL; 62 + } 63 + 54 64 return; 55 65 } 56 66 ··· 646 636 destroy_workqueue(client->pg_inv_wq); 647 637 destroy_workqueue(client->trunc_wq); 648 638 639 + bdi_destroy(&client->backing_dev_info); 640 + 649 641 if (client->msgr) 650 642 ceph_messenger_destroy(client->msgr); 651 643 mempool_destroy(client->wb_pagevec_pool); ··· 888 876 { 889 877 int err; 890 878 891 - sb->s_bdi = &client->backing_dev_info; 892 - 893 879 /* set ra_pages based on rsize mount option? */ 894 880 if (client->mount_args->rsize >= PAGE_CACHE_SIZE) 895 881 client->backing_dev_info.ra_pages = 896 882 (client->mount_args->rsize + PAGE_CACHE_SIZE - 1) 897 883 >> PAGE_SHIFT; 898 884 err = bdi_register_dev(&client->backing_dev_info, sb->s_dev); 885 + if (!err) 886 + sb->s_bdi = &client->backing_dev_info; 899 887 return err; 900 888 } 901 889 ··· 969 957 dout("kill_sb %p\n", s); 970 958 ceph_mdsc_pre_umount(&client->mdsc); 971 959 kill_anon_super(s); /* will call put_super after sb is r/o */ 972 - if (s->s_bdi == &client->backing_dev_info) 973 - bdi_unregister(&client->backing_dev_info); 974 - bdi_destroy(&client->backing_dev_info); 975 960 ceph_destroy_client(client); 976 961 } 977 962