ceph: release cap on import if we don't have the inode

If we get an IMPORT that give us a cap, but we don't have the inode, queue
a release (and try to send it immediately) so that the MDS doesn't get
stuck waiting for us.

Signed-off-by: Sage Weil <sage@newdream.net>

Sage Weil 3d7ded4d 9dbd412f

+61 -38
+55 -35
fs/ceph/caps.c
··· 981 return 0; 982 } 983 984 /* 985 * Queue cap releases when an inode is dropped from our cache. Since 986 * inode is about to be destroyed, there is no need for i_lock. ··· 1034 while (p) { 1035 struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node); 1036 struct ceph_mds_session *session = cap->session; 1037 - struct ceph_msg *msg; 1038 - struct ceph_mds_cap_release *head; 1039 - struct ceph_mds_cap_item *item; 1040 1041 - spin_lock(&session->s_cap_lock); 1042 - BUG_ON(!session->s_num_cap_releases); 1043 - msg = list_first_entry(&session->s_cap_releases, 1044 - struct ceph_msg, list_head); 1045 - 1046 - dout(" adding %p release to mds%d msg %p (%d left)\n", 1047 - inode, session->s_mds, msg, session->s_num_cap_releases); 1048 - 1049 - BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE); 1050 - head = msg->front.iov_base; 1051 - head->num = cpu_to_le32(le32_to_cpu(head->num) + 1); 1052 - item = msg->front.iov_base + msg->front.iov_len; 1053 - item->ino = cpu_to_le64(ceph_ino(inode)); 1054 - item->cap_id = cpu_to_le64(cap->cap_id); 1055 - item->migrate_seq = cpu_to_le32(cap->mseq); 1056 - item->seq = cpu_to_le32(cap->issue_seq); 1057 - 1058 - session->s_num_cap_releases--; 1059 - 1060 - msg->front.iov_len += sizeof(*item); 1061 - if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 1062 - dout(" release msg %p full\n", msg); 1063 - list_move_tail(&msg->list_head, 1064 - &session->s_cap_releases_done); 1065 - } else { 1066 - dout(" release msg %p at %d/%d (%d)\n", msg, 1067 - (int)le32_to_cpu(head->num), 1068 - (int)CEPH_CAPS_PER_RELEASE, 1069 - (int)msg->front.iov_len); 1070 - } 1071 - spin_unlock(&session->s_cap_lock); 1072 p = rb_next(p); 1073 __ceph_remove_cap(cap); 1074 } ··· 2663 struct ceph_mds_caps *h; 2664 int mds = session->s_mds; 2665 int op; 2666 - u32 seq; 2667 struct ceph_vino vino; 2668 u64 cap_id; 2669 u64 size, max_size; ··· 2683 vino.snap = CEPH_NOSNAP; 2684 cap_id = le64_to_cpu(h->cap_id); 2685 seq = le32_to_cpu(h->seq); 2686 size = le64_to_cpu(h->size); 2687 max_size = le64_to_cpu(h->max_size); 2688 ··· 2698 vino.snap, inode); 2699 if (!inode) { 2700 dout(" i don't have ino %llx\n", vino.ino); 2701 goto done; 2702 } 2703
··· 981 return 0; 982 } 983 984 + static void __queue_cap_release(struct ceph_mds_session *session, 985 + u64 ino, u64 cap_id, u32 migrate_seq, 986 + u32 issue_seq) 987 + { 988 + struct ceph_msg *msg; 989 + struct ceph_mds_cap_release *head; 990 + struct ceph_mds_cap_item *item; 991 + 992 + spin_lock(&session->s_cap_lock); 993 + BUG_ON(!session->s_num_cap_releases); 994 + msg = list_first_entry(&session->s_cap_releases, 995 + struct ceph_msg, list_head); 996 + 997 + dout(" adding %llx release to mds%d msg %p (%d left)\n", 998 + ino, session->s_mds, msg, session->s_num_cap_releases); 999 + 1000 + BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE); 1001 + head = msg->front.iov_base; 1002 + head->num = cpu_to_le32(le32_to_cpu(head->num) + 1); 1003 + item = msg->front.iov_base + msg->front.iov_len; 1004 + item->ino = cpu_to_le64(ino); 1005 + item->cap_id = cpu_to_le64(cap_id); 1006 + item->migrate_seq = cpu_to_le32(migrate_seq); 1007 + item->seq = cpu_to_le32(issue_seq); 1008 + 1009 + session->s_num_cap_releases--; 1010 + 1011 + msg->front.iov_len += sizeof(*item); 1012 + if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 1013 + dout(" release msg %p full\n", msg); 1014 + list_move_tail(&msg->list_head, &session->s_cap_releases_done); 1015 + } else { 1016 + dout(" release msg %p at %d/%d (%d)\n", msg, 1017 + (int)le32_to_cpu(head->num), 1018 + (int)CEPH_CAPS_PER_RELEASE, 1019 + (int)msg->front.iov_len); 1020 + } 1021 + spin_unlock(&session->s_cap_lock); 1022 + } 1023 + 1024 /* 1025 * Queue cap releases when an inode is dropped from our cache. Since 1026 * inode is about to be destroyed, there is no need for i_lock. ··· 994 while (p) { 995 struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node); 996 struct ceph_mds_session *session = cap->session; 997 998 + __queue_cap_release(session, ceph_ino(inode), cap->cap_id, 999 + cap->mseq, cap->issue_seq); 1000 p = rb_next(p); 1001 __ceph_remove_cap(cap); 1002 } ··· 2655 struct ceph_mds_caps *h; 2656 int mds = session->s_mds; 2657 int op; 2658 + u32 seq, mseq; 2659 struct ceph_vino vino; 2660 u64 cap_id; 2661 u64 size, max_size; ··· 2675 vino.snap = CEPH_NOSNAP; 2676 cap_id = le64_to_cpu(h->cap_id); 2677 seq = le32_to_cpu(h->seq); 2678 + mseq = le32_to_cpu(h->migrate_seq); 2679 size = le64_to_cpu(h->size); 2680 max_size = le64_to_cpu(h->max_size); 2681 ··· 2689 vino.snap, inode); 2690 if (!inode) { 2691 dout(" i don't have ino %llx\n", vino.ino); 2692 + 2693 + if (op == CEPH_CAP_OP_IMPORT) 2694 + __queue_cap_release(session, vino.ino, cap_id, 2695 + mseq, seq); 2696 + 2697 + /* 2698 + * send any full release message to try to move things 2699 + * along for the mds (who clearly thinks we still have this 2700 + * cap). 2701 + */ 2702 + ceph_send_cap_releases(mdsc, session); 2703 goto done; 2704 } 2705
+3 -3
fs/ceph/mds_client.c
··· 1176 /* 1177 * called under s_mutex 1178 */ 1179 - static void send_cap_releases(struct ceph_mds_client *mdsc, 1180 - struct ceph_mds_session *session) 1181 { 1182 struct ceph_msg *msg; 1183 ··· 2693 add_cap_releases(mdsc, s, -1); 2694 if (s->s_state == CEPH_MDS_SESSION_OPEN || 2695 s->s_state == CEPH_MDS_SESSION_HUNG) 2696 - send_cap_releases(mdsc, s); 2697 mutex_unlock(&s->s_mutex); 2698 ceph_put_mds_session(s); 2699
··· 1176 /* 1177 * called under s_mutex 1178 */ 1179 + void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 1180 + struct ceph_mds_session *session) 1181 { 1182 struct ceph_msg *msg; 1183 ··· 2693 add_cap_releases(mdsc, s, -1); 2694 if (s->s_state == CEPH_MDS_SESSION_OPEN || 2695 s->s_state == CEPH_MDS_SESSION_HUNG) 2696 + ceph_send_cap_releases(mdsc, s); 2697 mutex_unlock(&s->s_mutex); 2698 ceph_put_mds_session(s); 2699
+3
fs/ceph/mds_client.h
··· 322 kref_put(&req->r_kref, ceph_mdsc_release_request); 323 } 324 325 extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc); 326 327 extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
··· 322 kref_put(&req->r_kref, ceph_mdsc_release_request); 323 } 324 325 + extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 326 + struct ceph_mds_session *session); 327 + 328 extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc); 329 330 extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,