Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge branch 'for-linus-bugs' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull ceph bug-fixes from Sage Weil:
"These include a couple fixes to the new fscache code that went in
during the last cycle (which will need to go stable@ shortly as well),
a couple client-side directory fragmentation fixes, a fix for a race
in the cap release queuing path, and a couple race fixes in the
request abort and resend code.

Obviously some of this could have gone into 3.12 final, but I
preferred to overtest rather than send things in for a late -rc, and
then my travel schedule intervened"

* 'for-linus-bugs' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
ceph: allocate non-zero page to fscache in readpage()
ceph: wake up 'safe' waiters when unregistering request
ceph: cleanup aborted requests when re-sending requests.
ceph: handle race between cap reconnect and cap release
ceph: set caps count after composing cap reconnect message
ceph: queue cap release in __ceph_remove_cap()
ceph: handle frag mismatch between readdir request and reply
ceph: remove outdated frag information
ceph: hung on ceph fscache invalidate in some cases

+121 -41
+1 -1
fs/ceph/addr.c
··· 216 216 } 217 217 SetPageUptodate(page); 218 218 219 - if (err == 0) 219 + if (err >= 0) 220 220 ceph_readpage_to_fscache(inode, page); 221 221 222 222 out:
+3
fs/ceph/cache.c
··· 324 324 { 325 325 struct ceph_inode_info *ci = ceph_inode(inode); 326 326 327 + if (!PageFsCache(page)) 328 + return; 329 + 327 330 fscache_wait_on_page_write(ci->fscache, page); 328 331 fscache_uncache_page(ci->fscache, page); 329 332 }
+17 -10
fs/ceph/caps.c
··· 897 897 * caller should hold i_ceph_lock. 898 898 * caller will not hold session s_mutex if called from destroy_inode. 899 899 */ 900 - void __ceph_remove_cap(struct ceph_cap *cap) 900 + void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) 901 901 { 902 902 struct ceph_mds_session *session = cap->session; 903 903 struct ceph_inode_info *ci = cap->ci; ··· 909 909 910 910 /* remove from session list */ 911 911 spin_lock(&session->s_cap_lock); 912 + /* 913 + * s_cap_reconnect is protected by s_cap_lock. no one changes 914 + * s_cap_gen while session is in the reconnect state. 915 + */ 916 + if (queue_release && 917 + (!session->s_cap_reconnect || 918 + cap->cap_gen == session->s_cap_gen)) 919 + __queue_cap_release(session, ci->i_vino.ino, cap->cap_id, 920 + cap->mseq, cap->issue_seq); 921 + 912 922 if (session->s_cap_iterator == cap) { 913 923 /* not yet, we are iterating over this very cap */ 914 924 dout("__ceph_remove_cap delaying %p removal from session %p\n", ··· 1033 1023 struct ceph_mds_cap_release *head; 1034 1024 struct ceph_mds_cap_item *item; 1035 1025 1036 - spin_lock(&session->s_cap_lock); 1037 1026 BUG_ON(!session->s_num_cap_releases); 1038 1027 msg = list_first_entry(&session->s_cap_releases, 1039 1028 struct ceph_msg, list_head); ··· 1061 1052 (int)CEPH_CAPS_PER_RELEASE, 1062 1053 (int)msg->front.iov_len); 1063 1054 } 1064 - spin_unlock(&session->s_cap_lock); 1065 1055 } 1066 1056 1067 1057 /* ··· 1075 1067 p = rb_first(&ci->i_caps); 1076 1068 while (p) { 1077 1069 struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node); 1078 - struct ceph_mds_session *session = cap->session; 1079 - 1080 - __queue_cap_release(session, ceph_ino(inode), cap->cap_id, 1081 - cap->mseq, cap->issue_seq); 1082 1070 p = rb_next(p); 1083 - __ceph_remove_cap(cap); 1071 + __ceph_remove_cap(cap, true); 1084 1072 } 1085 1073 } 1086 1074 ··· 2795 2791 } 2796 2792 spin_unlock(&mdsc->cap_dirty_lock); 2797 2793 } 2798 - __ceph_remove_cap(cap); 2794 + __ceph_remove_cap(cap, false); 2799 2795 } 2800 2796 /* else, we already released it */ 2801 2797 ··· 2935 2931 if (!inode) { 2936 2932 dout(" i don't have ino %llx\n", vino.ino); 2937 2933 2938 - if (op == CEPH_CAP_OP_IMPORT) 2934 + if (op == CEPH_CAP_OP_IMPORT) { 2935 + spin_lock(&session->s_cap_lock); 2939 2936 __queue_cap_release(session, vino.ino, cap_id, 2940 2937 mseq, seq); 2938 + spin_unlock(&session->s_cap_lock); 2939 + } 2941 2940 goto flush_cap_releases; 2942 2941 } 2943 2942
+10 -1
fs/ceph/dir.c
··· 352 352 } 353 353 354 354 /* note next offset and last dentry name */ 355 + rinfo = &req->r_reply_info; 356 + if (le32_to_cpu(rinfo->dir_dir->frag) != frag) { 357 + frag = le32_to_cpu(rinfo->dir_dir->frag); 358 + if (ceph_frag_is_leftmost(frag)) 359 + fi->next_offset = 2; 360 + else 361 + fi->next_offset = 0; 362 + off = fi->next_offset; 363 + } 355 364 fi->offset = fi->next_offset; 356 365 fi->last_readdir = req; 366 + fi->frag = frag; 357 367 358 368 if (req->r_reply_info.dir_end) { 359 369 kfree(fi->last_name); ··· 373 363 else 374 364 fi->next_offset = 0; 375 365 } else { 376 - rinfo = &req->r_reply_info; 377 366 err = note_last_dentry(fi, 378 367 rinfo->dir_dname[rinfo->dir_nr-1], 379 368 rinfo->dir_dname_len[rinfo->dir_nr-1]);
+43 -6
fs/ceph/inode.c
··· 577 577 int issued = 0, implemented; 578 578 struct timespec mtime, atime, ctime; 579 579 u32 nsplits; 580 + struct ceph_inode_frag *frag; 581 + struct rb_node *rb_node; 580 582 struct ceph_buffer *xattr_blob = NULL; 581 583 int err = 0; 582 584 int queue_trunc = 0; ··· 753 751 /* FIXME: move me up, if/when version reflects fragtree changes */ 754 752 nsplits = le32_to_cpu(info->fragtree.nsplits); 755 753 mutex_lock(&ci->i_fragtree_mutex); 754 + rb_node = rb_first(&ci->i_fragtree); 756 755 for (i = 0; i < nsplits; i++) { 757 756 u32 id = le32_to_cpu(info->fragtree.splits[i].frag); 758 - struct ceph_inode_frag *frag = __get_or_create_frag(ci, id); 759 - 760 - if (IS_ERR(frag)) 761 - continue; 757 + frag = NULL; 758 + while (rb_node) { 759 + frag = rb_entry(rb_node, struct ceph_inode_frag, node); 760 + if (ceph_frag_compare(frag->frag, id) >= 0) { 761 + if (frag->frag != id) 762 + frag = NULL; 763 + else 764 + rb_node = rb_next(rb_node); 765 + break; 766 + } 767 + rb_node = rb_next(rb_node); 768 + rb_erase(&frag->node, &ci->i_fragtree); 769 + kfree(frag); 770 + frag = NULL; 771 + } 772 + if (!frag) { 773 + frag = __get_or_create_frag(ci, id); 774 + if (IS_ERR(frag)) 775 + continue; 776 + } 762 777 frag->split_by = le32_to_cpu(info->fragtree.splits[i].by); 763 778 dout(" frag %x split by %d\n", frag->frag, frag->split_by); 779 + } 780 + while (rb_node) { 781 + frag = rb_entry(rb_node, struct ceph_inode_frag, node); 782 + rb_node = rb_next(rb_node); 783 + rb_erase(&frag->node, &ci->i_fragtree); 784 + kfree(frag); 764 785 } 765 786 mutex_unlock(&ci->i_fragtree_mutex); 766 787 ··· 1275 1250 int err = 0, i; 1276 1251 struct inode *snapdir = NULL; 1277 1252 struct ceph_mds_request_head *rhead = req->r_request->front.iov_base; 1278 - u64 frag = le32_to_cpu(rhead->args.readdir.frag); 1279 1253 struct ceph_dentry_info *di; 1254 + u64 r_readdir_offset = req->r_readdir_offset; 1255 + u32 frag = le32_to_cpu(rhead->args.readdir.frag); 1256 + 1257 + if (rinfo->dir_dir && 1258 + le32_to_cpu(rinfo->dir_dir->frag) != frag) { 1259 + dout("readdir_prepopulate got new frag %x -> %x\n", 1260 + frag, le32_to_cpu(rinfo->dir_dir->frag)); 1261 + frag = le32_to_cpu(rinfo->dir_dir->frag); 1262 + if (ceph_frag_is_leftmost(frag)) 1263 + r_readdir_offset = 2; 1264 + else 1265 + r_readdir_offset = 0; 1266 + } 1280 1267 1281 1268 if (req->r_aborted) 1282 1269 return readdir_prepopulate_inodes_only(req, session); ··· 1352 1315 } 1353 1316 1354 1317 di = dn->d_fsdata; 1355 - di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset); 1318 + di->offset = ceph_make_fpos(frag, i + r_readdir_offset); 1356 1319 1357 1320 /* inode */ 1358 1321 if (dn->d_inode) {
+45 -16
fs/ceph/mds_client.c
··· 43 43 */ 44 44 45 45 struct ceph_reconnect_state { 46 + int nr_caps; 46 47 struct ceph_pagelist *pagelist; 47 48 bool flock; 48 49 }; ··· 444 443 INIT_LIST_HEAD(&s->s_waiting); 445 444 INIT_LIST_HEAD(&s->s_unsafe); 446 445 s->s_num_cap_releases = 0; 446 + s->s_cap_reconnect = 0; 447 447 s->s_cap_iterator = NULL; 448 448 INIT_LIST_HEAD(&s->s_cap_releases); 449 449 INIT_LIST_HEAD(&s->s_cap_releases_done); ··· 643 641 iput(req->r_unsafe_dir); 644 642 req->r_unsafe_dir = NULL; 645 643 } 644 + 645 + complete_all(&req->r_safe_completion); 646 646 647 647 ceph_mdsc_put_request(req); 648 648 } ··· 990 986 dout("removing cap %p, ci is %p, inode is %p\n", 991 987 cap, ci, &ci->vfs_inode); 992 988 spin_lock(&ci->i_ceph_lock); 993 - __ceph_remove_cap(cap); 989 + __ceph_remove_cap(cap, false); 994 990 if (!__ceph_is_any_real_caps(ci)) { 995 991 struct ceph_mds_client *mdsc = 996 992 ceph_sb_to_client(inode->i_sb)->mdsc; ··· 1235 1231 session->s_trim_caps--; 1236 1232 if (oissued) { 1237 1233 /* we aren't the only cap.. just remove us */ 1238 - __queue_cap_release(session, ceph_ino(inode), cap->cap_id, 1239 - cap->mseq, cap->issue_seq); 1240 - __ceph_remove_cap(cap); 1234 + __ceph_remove_cap(cap, true); 1241 1235 } else { 1242 1236 /* try to drop referring dentries */ 1243 1237 spin_unlock(&ci->i_ceph_lock); ··· 1418 1416 unsigned num; 1419 1417 1420 1418 dout("discard_cap_releases mds%d\n", session->s_mds); 1421 - spin_lock(&session->s_cap_lock); 1422 1419 1423 1420 /* zero out the in-progress message */ 1424 1421 msg = list_first_entry(&session->s_cap_releases, ··· 1444 1443 msg->front.iov_len = sizeof(*head); 1445 1444 list_add(&msg->list_head, &session->s_cap_releases); 1446 1445 } 1447 - 1448 - spin_unlock(&session->s_cap_lock); 1449 1446 } 1450 1447 1451 1448 /* ··· 1874 1875 int mds = -1; 1875 1876 int err = -EAGAIN; 1876 1877 1877 - if (req->r_err || req->r_got_result) 1878 + if (req->r_err || req->r_got_result) { 1879 + if (req->r_aborted) 1880 + __unregister_request(mdsc, req); 1878 1881 goto out; 1882 + } 1879 1883 1880 1884 if (req->r_timeout && 1881 1885 time_after_eq(jiffies, req->r_started + req->r_timeout)) { ··· 2188 2186 if (head->safe) { 2189 2187 req->r_got_safe = true; 2190 2188 __unregister_request(mdsc, req); 2191 - complete_all(&req->r_safe_completion); 2192 2189 2193 2190 if (req->r_got_unsafe) { 2194 2191 /* ··· 2239 2238 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); 2240 2239 if (err == 0) { 2241 2240 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 2242 - req->r_op == CEPH_MDS_OP_LSSNAP) && 2243 - rinfo->dir_nr) 2241 + req->r_op == CEPH_MDS_OP_LSSNAP)) 2244 2242 ceph_readdir_prepopulate(req, req->r_session); 2245 2243 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 2246 2244 } ··· 2490 2490 cap->seq = 0; /* reset cap seq */ 2491 2491 cap->issue_seq = 0; /* and issue_seq */ 2492 2492 cap->mseq = 0; /* and migrate_seq */ 2493 + cap->cap_gen = cap->session->s_cap_gen; 2493 2494 2494 2495 if (recon_state->flock) { 2495 2496 rec.v2.cap_id = cpu_to_le64(cap->cap_id); ··· 2553 2552 } else { 2554 2553 err = ceph_pagelist_append(pagelist, &rec, reclen); 2555 2554 } 2555 + 2556 + recon_state->nr_caps++; 2556 2557 out_free: 2557 2558 kfree(path); 2558 2559 out_dput: ··· 2582 2579 struct rb_node *p; 2583 2580 int mds = session->s_mds; 2584 2581 int err = -ENOMEM; 2582 + int s_nr_caps; 2585 2583 struct ceph_pagelist *pagelist; 2586 2584 struct ceph_reconnect_state recon_state; 2587 2585 ··· 2614 2610 dout("session %p state %s\n", session, 2615 2611 session_state_name(session->s_state)); 2616 2612 2613 + spin_lock(&session->s_gen_ttl_lock); 2614 + session->s_cap_gen++; 2615 + spin_unlock(&session->s_gen_ttl_lock); 2616 + 2617 + spin_lock(&session->s_cap_lock); 2618 + /* 2619 + * notify __ceph_remove_cap() that we are composing cap reconnect. 2620 + * If a cap get released before being added to the cap reconnect, 2621 + * __ceph_remove_cap() should skip queuing cap release. 2622 + */ 2623 + session->s_cap_reconnect = 1; 2617 2624 /* drop old cap expires; we're about to reestablish that state */ 2618 2625 discard_cap_releases(mdsc, session); 2626 + spin_unlock(&session->s_cap_lock); 2619 2627 2620 2628 /* traverse this session's caps */ 2621 - err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps); 2629 + s_nr_caps = session->s_nr_caps; 2630 + err = ceph_pagelist_encode_32(pagelist, s_nr_caps); 2622 2631 if (err) 2623 2632 goto fail; 2624 2633 2634 + recon_state.nr_caps = 0; 2625 2635 recon_state.pagelist = pagelist; 2626 2636 recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK; 2627 2637 err = iterate_session_caps(session, encode_caps_cb, &recon_state); 2628 2638 if (err < 0) 2629 2639 goto fail; 2640 + 2641 + spin_lock(&session->s_cap_lock); 2642 + session->s_cap_reconnect = 0; 2643 + spin_unlock(&session->s_cap_lock); 2630 2644 2631 2645 /* 2632 2646 * snaprealms. we provide mds with the ino, seq (version), and ··· 2668 2646 2669 2647 if (recon_state.flock) 2670 2648 reply->hdr.version = cpu_to_le16(2); 2671 - if (pagelist->length) { 2672 - /* set up outbound data if we have any */ 2673 - reply->hdr.data_len = cpu_to_le32(pagelist->length); 2674 - ceph_msg_data_add_pagelist(reply, pagelist); 2649 + 2650 + /* raced with cap release? */ 2651 + if (s_nr_caps != recon_state.nr_caps) { 2652 + struct page *page = list_first_entry(&pagelist->head, 2653 + struct page, lru); 2654 + __le32 *addr = kmap_atomic(page); 2655 + *addr = cpu_to_le32(recon_state.nr_caps); 2656 + kunmap_atomic(addr); 2675 2657 } 2658 + 2659 + reply->hdr.data_len = cpu_to_le32(pagelist->length); 2660 + ceph_msg_data_add_pagelist(reply, pagelist); 2676 2661 ceph_con_send(&session->s_con, reply); 2677 2662 2678 2663 mutex_unlock(&session->s_mutex);
+1
fs/ceph/mds_client.h
··· 132 132 struct list_head s_caps; /* all caps issued by this session */ 133 133 int s_nr_caps, s_trim_caps; 134 134 int s_num_cap_releases; 135 + int s_cap_reconnect; 135 136 struct list_head s_cap_releases; /* waiting cap_release messages */ 136 137 struct list_head s_cap_releases_done; /* ready to send */ 137 138 struct ceph_cap *s_cap_iterator;
+1 -7
fs/ceph/super.h
··· 741 741 int fmode, unsigned issued, unsigned wanted, 742 742 unsigned cap, unsigned seq, u64 realmino, int flags, 743 743 struct ceph_cap_reservation *caps_reservation); 744 - extern void __ceph_remove_cap(struct ceph_cap *cap); 745 - static inline void ceph_remove_cap(struct ceph_cap *cap) 746 - { 747 - spin_lock(&cap->ci->i_ceph_lock); 748 - __ceph_remove_cap(cap); 749 - spin_unlock(&cap->ci->i_ceph_lock); 750 - } 744 + extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release); 751 745 extern void ceph_put_cap(struct ceph_mds_client *mdsc, 752 746 struct ceph_cap *cap); 753 747