Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

ceph: improve reference tracking for snaprealm

When snaprealm is created, its initial reference count is zero.
But in some rare cases, the newly created snaprealm is not referenced
by anyone. This causes snaprealm with zero reference count not freed.

The fix is set reference count of newly snaprealm to 1. The reference
is return the function who requests to create the snaprealm. When the
function finishes its job, it releases the reference.

Signed-off-by: Yan, Zheng <zyan@redhat.com>

authored by

Yan, Zheng and committed by
Ilya Dryomov
982d6011 1487a688

+63 -27
+16 -8
fs/ceph/caps.c
··· 577 577 struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc, 578 578 realmino); 579 579 if (realm) { 580 - ceph_get_snap_realm(mdsc, realm); 581 580 spin_lock(&realm->inodes_with_caps_lock); 582 581 ci->i_snap_realm = realm; 583 582 list_add(&ci->i_snap_realm_item, ··· 2446 2447 */ 2447 2448 static void handle_cap_grant(struct ceph_mds_client *mdsc, 2448 2449 struct inode *inode, struct ceph_mds_caps *grant, 2449 - void *snaptrace, int snaptrace_len, 2450 2450 u64 inline_version, 2451 2451 void *inline_data, int inline_len, 2452 2452 struct ceph_buffer *xattr_buf, 2453 2453 struct ceph_mds_session *session, 2454 2454 struct ceph_cap *cap, int issued) 2455 2455 __releases(ci->i_ceph_lock) 2456 + __releases(mdsc->snap_rwsem) 2456 2457 { 2457 2458 struct ceph_inode_info *ci = ceph_inode(inode); 2458 2459 int mds = session->s_mds; ··· 2653 2654 spin_unlock(&ci->i_ceph_lock); 2654 2655 2655 2656 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { 2656 - down_write(&mdsc->snap_rwsem); 2657 - ceph_update_snap_trace(mdsc, snaptrace, 2658 - snaptrace + snaptrace_len, false); 2659 - downgrade_write(&mdsc->snap_rwsem); 2660 2657 kick_flushing_inode_caps(mdsc, session, inode); 2661 2658 up_read(&mdsc->snap_rwsem); 2662 2659 if (newcaps & ~issued) ··· 3062 3067 struct ceph_cap *cap; 3063 3068 struct ceph_mds_caps *h; 3064 3069 struct ceph_mds_cap_peer *peer = NULL; 3070 + struct ceph_snap_realm *realm; 3065 3071 int mds = session->s_mds; 3066 3072 int op, issued; 3067 3073 u32 seq, mseq; ··· 3164 3168 goto done_unlocked; 3165 3169 3166 3170 case CEPH_CAP_OP_IMPORT: 3171 + realm = NULL; 3172 + if (snaptrace_len) { 3173 + down_write(&mdsc->snap_rwsem); 3174 + ceph_update_snap_trace(mdsc, snaptrace, 3175 + snaptrace + snaptrace_len, 3176 + false, &realm); 3177 + downgrade_write(&mdsc->snap_rwsem); 3178 + } else { 3179 + down_read(&mdsc->snap_rwsem); 3180 + } 3167 3181 handle_cap_import(mdsc, inode, h, peer, session, 3168 3182 &cap, &issued); 3169 - handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len, 3183 + handle_cap_grant(mdsc, inode, h, 3170 3184 inline_version, inline_data, inline_len, 3171 3185 msg->middle, session, cap, issued); 3186 + if (realm) 3187 + ceph_put_snap_realm(mdsc, realm); 3172 3188 goto done_unlocked; 3173 3189 } 3174 3190 ··· 3200 3192 case CEPH_CAP_OP_GRANT: 3201 3193 __ceph_caps_issued(ci, &issued); 3202 3194 issued |= __ceph_caps_dirty(ci); 3203 - handle_cap_grant(mdsc, inode, h, NULL, 0, 3195 + handle_cap_grant(mdsc, inode, h, 3204 3196 inline_version, inline_data, inline_len, 3205 3197 msg->middle, session, cap, issued); 3206 3198 goto done_unlocked;
+7 -2
fs/ceph/mds_client.c
··· 2286 2286 struct ceph_mds_request *req; 2287 2287 struct ceph_mds_reply_head *head = msg->front.iov_base; 2288 2288 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 2289 + struct ceph_snap_realm *realm; 2289 2290 u64 tid; 2290 2291 int err, result; 2291 2292 int mds = session->s_mds; ··· 2402 2401 } 2403 2402 2404 2403 /* snap trace */ 2404 + realm = NULL; 2405 2405 if (rinfo->snapblob_len) { 2406 2406 down_write(&mdsc->snap_rwsem); 2407 2407 ceph_update_snap_trace(mdsc, rinfo->snapblob, 2408 - rinfo->snapblob + rinfo->snapblob_len, 2409 - le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP); 2408 + rinfo->snapblob + rinfo->snapblob_len, 2409 + le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 2410 + &realm); 2410 2411 downgrade_write(&mdsc->snap_rwsem); 2411 2412 } else { 2412 2413 down_read(&mdsc->snap_rwsem); ··· 2426 2423 mutex_unlock(&req->r_fill_mutex); 2427 2424 2428 2425 up_read(&mdsc->snap_rwsem); 2426 + if (realm) 2427 + ceph_put_snap_realm(mdsc, realm); 2429 2428 out_err: 2430 2429 mutex_lock(&mdsc->mutex); 2431 2430 if (!req->r_aborted) {
+38 -16
fs/ceph/snap.c
··· 70 70 * safe. we do need to protect against concurrent empty list 71 71 * additions, however. 72 72 */ 73 - if (atomic_read(&realm->nref) == 0) { 73 + if (atomic_inc_return(&realm->nref) == 1) { 74 74 spin_lock(&mdsc->snap_empty_lock); 75 75 list_del_init(&realm->empty_item); 76 76 spin_unlock(&mdsc->snap_empty_lock); 77 77 } 78 - 79 - atomic_inc(&realm->nref); 80 78 } 81 79 82 80 static void __insert_snap_realm(struct rb_root *root, ··· 114 116 if (!realm) 115 117 return ERR_PTR(-ENOMEM); 116 118 117 - atomic_set(&realm->nref, 0); /* tree does not take a ref */ 119 + atomic_set(&realm->nref, 1); /* for caller */ 118 120 realm->ino = ino; 119 121 INIT_LIST_HEAD(&realm->children); 120 122 INIT_LIST_HEAD(&realm->child_item); ··· 132 134 * 133 135 * caller must hold snap_rwsem for write. 134 136 */ 135 - struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc, 136 - u64 ino) 137 + static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc, 138 + u64 ino) 137 139 { 138 140 struct rb_node *n = mdsc->snap_realms.rb_node; 139 141 struct ceph_snap_realm *r; ··· 150 152 } 151 153 } 152 154 return NULL; 155 + } 156 + 157 + struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc, 158 + u64 ino) 159 + { 160 + struct ceph_snap_realm *r; 161 + r = __lookup_snap_realm(mdsc, ino); 162 + if (r) 163 + ceph_get_snap_realm(mdsc, r); 164 + return r; 153 165 } 154 166 155 167 static void __put_snap_realm(struct ceph_mds_client *mdsc, ··· 281 273 } 282 274 realm->parent_ino = parentino; 283 275 realm->parent = parent; 284 - ceph_get_snap_realm(mdsc, parent); 285 276 list_add(&realm->child_item, &parent->children); 286 277 return 1; 287 278 } ··· 638 631 * Caller must hold snap_rwsem for write. 639 632 */ 640 633 int ceph_update_snap_trace(struct ceph_mds_client *mdsc, 641 - void *p, void *e, bool deletion) 634 + void *p, void *e, bool deletion, 635 + struct ceph_snap_realm **realm_ret) 642 636 { 643 637 struct ceph_mds_snap_realm *ri; /* encoded */ 644 638 __le64 *snaps; /* encoded */ 645 639 __le64 *prior_parent_snaps; /* encoded */ 646 - struct ceph_snap_realm *realm; 640 + struct ceph_snap_realm *realm = NULL; 641 + struct ceph_snap_realm *first_realm = NULL; 647 642 int invalidate = 0; 648 643 int err = -ENOMEM; 649 644 LIST_HEAD(dirty_realms); ··· 713 704 dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino, 714 705 realm, invalidate, p, e); 715 706 707 + /* invalidate when we reach the _end_ (root) of the trace */ 708 + if (invalidate && p >= e) 709 + rebuild_snap_realms(realm); 710 + 711 + if (!first_realm) 712 + first_realm = realm; 713 + else 714 + ceph_put_snap_realm(mdsc, realm); 715 + 716 716 if (p < e) 717 717 goto more; 718 - 719 - /* invalidate when we reach the _end_ (root) of the trace */ 720 - if (invalidate) 721 - rebuild_snap_realms(realm); 722 718 723 719 /* 724 720 * queue cap snaps _after_ we've built the new snap contexts, ··· 735 721 queue_realm_cap_snaps(realm); 736 722 } 737 723 724 + if (realm_ret) 725 + *realm_ret = first_realm; 726 + else 727 + ceph_put_snap_realm(mdsc, first_realm); 728 + 738 729 __cleanup_empty_realms(mdsc); 739 730 return 0; 740 731 741 732 bad: 742 733 err = -EINVAL; 743 734 fail: 735 + if (realm && !IS_ERR(realm)) 736 + ceph_put_snap_realm(mdsc, realm); 737 + if (first_realm) 738 + ceph_put_snap_realm(mdsc, first_realm); 744 739 pr_err("update_snap_trace error %d\n", err); 745 740 return err; 746 741 } ··· 867 844 if (IS_ERR(realm)) 868 845 goto out; 869 846 } 870 - ceph_get_snap_realm(mdsc, realm); 871 847 872 848 dout("splitting snap_realm %llx %p\n", realm->ino, realm); 873 849 for (i = 0; i < num_split_inos; i++) { ··· 927 905 /* we may have taken some of the old realm's children. */ 928 906 for (i = 0; i < num_split_realms; i++) { 929 907 struct ceph_snap_realm *child = 930 - ceph_lookup_snap_realm(mdsc, 908 + __lookup_snap_realm(mdsc, 931 909 le64_to_cpu(split_realms[i])); 932 910 if (!child) 933 911 continue; ··· 940 918 * snap, we can avoid queueing cap_snaps. 941 919 */ 942 920 ceph_update_snap_trace(mdsc, p, e, 943 - op == CEPH_SNAP_OP_DESTROY); 921 + op == CEPH_SNAP_OP_DESTROY, NULL); 944 922 945 923 if (op == CEPH_SNAP_OP_SPLIT) 946 924 /* we took a reference when we created the realm, above */
+2 -1
fs/ceph/super.h
··· 693 693 extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc, 694 694 struct ceph_snap_realm *realm); 695 695 extern int ceph_update_snap_trace(struct ceph_mds_client *m, 696 - void *p, void *e, bool deletion); 696 + void *p, void *e, bool deletion, 697 + struct ceph_snap_realm **realm_ret); 697 698 extern void ceph_handle_snap(struct ceph_mds_client *mdsc, 698 699 struct ceph_mds_session *session, 699 700 struct ceph_msg *msg);