Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

rbd: support for exclusive-lock feature

Add basic support for RBD_FEATURE_EXCLUSIVE_LOCK feature. Maintenance
operations (resize, snapshot create, etc) are offloaded to librbd via
returning -EOPNOTSUPP - librbd should request the lock and execute the
operation.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Reviewed-by: Mike Christie <mchristi@redhat.com>
Tested-by: Mike Christie <mchristi@redhat.com>

+812 -20
+800 -20
drivers/block/rbd.c
··· 31 31 #include <linux/ceph/libceph.h> 32 32 #include <linux/ceph/osd_client.h> 33 33 #include <linux/ceph/mon_client.h> 34 + #include <linux/ceph/cls_lock_client.h> 34 35 #include <linux/ceph/decode.h> 35 36 #include <linux/parser.h> 36 37 #include <linux/bsearch.h> ··· 115 114 116 115 #define RBD_OBJ_PREFIX_LEN_MAX 64 117 116 117 + #define RBD_NOTIFY_TIMEOUT 5 /* seconds */ 118 118 #define RBD_RETRY_DELAY msecs_to_jiffies(1000) 119 119 120 120 /* Feature bits */ 121 121 122 122 #define RBD_FEATURE_LAYERING (1<<0) 123 123 #define RBD_FEATURE_STRIPINGV2 (1<<1) 124 - #define RBD_FEATURES_ALL \ 125 - (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2) 124 + #define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2) 125 + #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ 126 + RBD_FEATURE_STRIPINGV2 | \ 127 + RBD_FEATURE_EXCLUSIVE_LOCK) 126 128 127 129 /* Features supported by this (client software) implementation. */ 128 130 ··· 331 327 RBD_WATCH_STATE_ERROR, 332 328 }; 333 329 330 + enum rbd_lock_state { 331 + RBD_LOCK_STATE_UNLOCKED, 332 + RBD_LOCK_STATE_LOCKED, 333 + RBD_LOCK_STATE_RELEASING, 334 + }; 335 + 336 + /* WatchNotify::ClientId */ 337 + struct rbd_client_id { 338 + u64 gid; 339 + u64 handle; 340 + }; 341 + 334 342 struct rbd_mapping { 335 343 u64 size; 336 344 u64 features; ··· 381 365 struct ceph_osd_linger_request *watch_handle; 382 366 u64 watch_cookie; 383 367 struct delayed_work watch_dwork; 368 + 369 + struct rw_semaphore lock_rwsem; 370 + enum rbd_lock_state lock_state; 371 + struct rbd_client_id owner_cid; 372 + struct work_struct acquired_lock_work; 373 + struct work_struct released_lock_work; 374 + struct delayed_work lock_dwork; 375 + struct work_struct unlock_work; 376 + wait_queue_head_t lock_waitq; 384 377 385 378 struct workqueue_struct *task_wq; 386 379 ··· 473 448 static int minor_to_rbd_dev_id(int minor) 474 449 { 475 450 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; 451 + } 452 + 453 + static bool rbd_is_lock_supported(struct rbd_device *rbd_dev) 454 + { 455 + return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && 456 + rbd_dev->spec->snap_id == CEPH_NOSNAP && 457 + !rbd_dev->mapping.read_only; 458 + } 459 + 460 + static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) 461 + { 462 + return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || 463 + rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; 464 + } 465 + 466 + static bool rbd_is_lock_owner(struct rbd_device *rbd_dev) 467 + { 468 + bool is_lock_owner; 469 + 470 + down_read(&rbd_dev->lock_rwsem); 471 + is_lock_owner = __rbd_is_lock_owner(rbd_dev); 472 + up_read(&rbd_dev->lock_rwsem); 473 + return is_lock_owner; 476 474 } 477 475 478 476 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add); ··· 3143 3095 obj_request_done_set(obj_request); 3144 3096 } 3145 3097 3098 + static const struct rbd_client_id rbd_empty_cid; 3099 + 3100 + static bool rbd_cid_equal(const struct rbd_client_id *lhs, 3101 + const struct rbd_client_id *rhs) 3102 + { 3103 + return lhs->gid == rhs->gid && lhs->handle == rhs->handle; 3104 + } 3105 + 3106 + static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev) 3107 + { 3108 + struct rbd_client_id cid; 3109 + 3110 + mutex_lock(&rbd_dev->watch_mutex); 3111 + cid.gid = ceph_client_gid(rbd_dev->rbd_client->client); 3112 + cid.handle = rbd_dev->watch_cookie; 3113 + mutex_unlock(&rbd_dev->watch_mutex); 3114 + return cid; 3115 + } 3116 + 3117 + /* 3118 + * lock_rwsem must be held for write 3119 + */ 3120 + static void rbd_set_owner_cid(struct rbd_device *rbd_dev, 3121 + const struct rbd_client_id *cid) 3122 + { 3123 + dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev, 3124 + rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle, 3125 + cid->gid, cid->handle); 3126 + rbd_dev->owner_cid = *cid; /* struct */ 3127 + } 3128 + 3129 + static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf) 3130 + { 3131 + mutex_lock(&rbd_dev->watch_mutex); 3132 + sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie); 3133 + mutex_unlock(&rbd_dev->watch_mutex); 3134 + } 3135 + 3136 + /* 3137 + * lock_rwsem must be held for write 3138 + */ 3139 + static int rbd_lock(struct rbd_device *rbd_dev) 3140 + { 3141 + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3142 + struct rbd_client_id cid = rbd_get_cid(rbd_dev); 3143 + char cookie[32]; 3144 + int ret; 3145 + 3146 + WARN_ON(__rbd_is_lock_owner(rbd_dev)); 3147 + 3148 + format_lock_cookie(rbd_dev, cookie); 3149 + ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 3150 + RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie, 3151 + RBD_LOCK_TAG, "", 0); 3152 + if (ret) 3153 + return ret; 3154 + 3155 + rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; 3156 + rbd_set_owner_cid(rbd_dev, &cid); 3157 + queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); 3158 + return 0; 3159 + } 3160 + 3161 + /* 3162 + * lock_rwsem must be held for write 3163 + */ 3164 + static int rbd_unlock(struct rbd_device *rbd_dev) 3165 + { 3166 + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3167 + char cookie[32]; 3168 + int ret; 3169 + 3170 + WARN_ON(!__rbd_is_lock_owner(rbd_dev)); 3171 + 3172 + rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; 3173 + 3174 + format_lock_cookie(rbd_dev, cookie); 3175 + ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 3176 + RBD_LOCK_NAME, cookie); 3177 + if (ret && ret != -ENOENT) { 3178 + rbd_warn(rbd_dev, "cls_unlock failed: %d", ret); 3179 + return ret; 3180 + } 3181 + 3182 + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 3183 + queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work); 3184 + return 0; 3185 + } 3186 + 3187 + static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, 3188 + enum rbd_notify_op notify_op, 3189 + struct page ***preply_pages, 3190 + size_t *preply_len) 3191 + { 3192 + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3193 + struct rbd_client_id cid = rbd_get_cid(rbd_dev); 3194 + int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN; 3195 + char buf[buf_size]; 3196 + void *p = buf; 3197 + 3198 + dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op); 3199 + 3200 + /* encode *LockPayload NotifyMessage (op + ClientId) */ 3201 + ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN); 3202 + ceph_encode_32(&p, notify_op); 3203 + ceph_encode_64(&p, cid.gid); 3204 + ceph_encode_64(&p, cid.handle); 3205 + 3206 + return ceph_osdc_notify(osdc, &rbd_dev->header_oid, 3207 + &rbd_dev->header_oloc, buf, buf_size, 3208 + RBD_NOTIFY_TIMEOUT, preply_pages, preply_len); 3209 + } 3210 + 3211 + static void rbd_notify_op_lock(struct rbd_device *rbd_dev, 3212 + enum rbd_notify_op notify_op) 3213 + { 3214 + struct page **reply_pages; 3215 + size_t reply_len; 3216 + 3217 + __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len); 3218 + ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); 3219 + } 3220 + 3221 + static void rbd_notify_acquired_lock(struct work_struct *work) 3222 + { 3223 + struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 3224 + acquired_lock_work); 3225 + 3226 + rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK); 3227 + } 3228 + 3229 + static void rbd_notify_released_lock(struct work_struct *work) 3230 + { 3231 + struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 3232 + released_lock_work); 3233 + 3234 + rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK); 3235 + } 3236 + 3237 + static int rbd_request_lock(struct rbd_device *rbd_dev) 3238 + { 3239 + struct page **reply_pages; 3240 + size_t reply_len; 3241 + bool lock_owner_responded = false; 3242 + int ret; 3243 + 3244 + dout("%s rbd_dev %p\n", __func__, rbd_dev); 3245 + 3246 + ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK, 3247 + &reply_pages, &reply_len); 3248 + if (ret && ret != -ETIMEDOUT) { 3249 + rbd_warn(rbd_dev, "failed to request lock: %d", ret); 3250 + goto out; 3251 + } 3252 + 3253 + if (reply_len > 0 && reply_len <= PAGE_SIZE) { 3254 + void *p = page_address(reply_pages[0]); 3255 + void *const end = p + reply_len; 3256 + u32 n; 3257 + 3258 + ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */ 3259 + while (n--) { 3260 + u8 struct_v; 3261 + u32 len; 3262 + 3263 + ceph_decode_need(&p, end, 8 + 8, e_inval); 3264 + p += 8 + 8; /* skip gid and cookie */ 3265 + 3266 + ceph_decode_32_safe(&p, end, len, e_inval); 3267 + if (!len) 3268 + continue; 3269 + 3270 + if (lock_owner_responded) { 3271 + rbd_warn(rbd_dev, 3272 + "duplicate lock owners detected"); 3273 + ret = -EIO; 3274 + goto out; 3275 + } 3276 + 3277 + lock_owner_responded = true; 3278 + ret = ceph_start_decoding(&p, end, 1, "ResponseMessage", 3279 + &struct_v, &len); 3280 + if (ret) { 3281 + rbd_warn(rbd_dev, 3282 + "failed to decode ResponseMessage: %d", 3283 + ret); 3284 + goto e_inval; 3285 + } 3286 + 3287 + ret = ceph_decode_32(&p); 3288 + } 3289 + } 3290 + 3291 + if (!lock_owner_responded) { 3292 + rbd_warn(rbd_dev, "no lock owners detected"); 3293 + ret = -ETIMEDOUT; 3294 + } 3295 + 3296 + out: 3297 + ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); 3298 + return ret; 3299 + 3300 + e_inval: 3301 + ret = -EINVAL; 3302 + goto out; 3303 + } 3304 + 3305 + static void wake_requests(struct rbd_device *rbd_dev, bool wake_all) 3306 + { 3307 + dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all); 3308 + 3309 + cancel_delayed_work(&rbd_dev->lock_dwork); 3310 + if (wake_all) 3311 + wake_up_all(&rbd_dev->lock_waitq); 3312 + else 3313 + wake_up(&rbd_dev->lock_waitq); 3314 + } 3315 + 3316 + static int get_lock_owner_info(struct rbd_device *rbd_dev, 3317 + struct ceph_locker **lockers, u32 *num_lockers) 3318 + { 3319 + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3320 + u8 lock_type; 3321 + char *lock_tag; 3322 + int ret; 3323 + 3324 + dout("%s rbd_dev %p\n", __func__, rbd_dev); 3325 + 3326 + ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid, 3327 + &rbd_dev->header_oloc, RBD_LOCK_NAME, 3328 + &lock_type, &lock_tag, lockers, num_lockers); 3329 + if (ret) 3330 + return ret; 3331 + 3332 + if (*num_lockers == 0) { 3333 + dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev); 3334 + goto out; 3335 + } 3336 + 3337 + if (strcmp(lock_tag, RBD_LOCK_TAG)) { 3338 + rbd_warn(rbd_dev, "locked by external mechanism, tag %s", 3339 + lock_tag); 3340 + ret = -EBUSY; 3341 + goto out; 3342 + } 3343 + 3344 + if (lock_type == CEPH_CLS_LOCK_SHARED) { 3345 + rbd_warn(rbd_dev, "shared lock type detected"); 3346 + ret = -EBUSY; 3347 + goto out; 3348 + } 3349 + 3350 + if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX, 3351 + strlen(RBD_LOCK_COOKIE_PREFIX))) { 3352 + rbd_warn(rbd_dev, "locked by external mechanism, cookie %s", 3353 + (*lockers)[0].id.cookie); 3354 + ret = -EBUSY; 3355 + goto out; 3356 + } 3357 + 3358 + out: 3359 + kfree(lock_tag); 3360 + return ret; 3361 + } 3362 + 3363 + static int find_watcher(struct rbd_device *rbd_dev, 3364 + const struct ceph_locker *locker) 3365 + { 3366 + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3367 + struct ceph_watch_item *watchers; 3368 + u32 num_watchers; 3369 + u64 cookie; 3370 + int i; 3371 + int ret; 3372 + 3373 + ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid, 3374 + &rbd_dev->header_oloc, &watchers, 3375 + &num_watchers); 3376 + if (ret) 3377 + return ret; 3378 + 3379 + sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie); 3380 + for (i = 0; i < num_watchers; i++) { 3381 + if (!memcmp(&watchers[i].addr, &locker->info.addr, 3382 + sizeof(locker->info.addr)) && 3383 + watchers[i].cookie == cookie) { 3384 + struct rbd_client_id cid = { 3385 + .gid = le64_to_cpu(watchers[i].name.num), 3386 + .handle = cookie, 3387 + }; 3388 + 3389 + dout("%s rbd_dev %p found cid %llu-%llu\n", __func__, 3390 + rbd_dev, cid.gid, cid.handle); 3391 + rbd_set_owner_cid(rbd_dev, &cid); 3392 + ret = 1; 3393 + goto out; 3394 + } 3395 + } 3396 + 3397 + dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev); 3398 + ret = 0; 3399 + out: 3400 + kfree(watchers); 3401 + return ret; 3402 + } 3403 + 3404 + /* 3405 + * lock_rwsem must be held for write 3406 + */ 3407 + static int rbd_try_lock(struct rbd_device *rbd_dev) 3408 + { 3409 + struct ceph_client *client = rbd_dev->rbd_client->client; 3410 + struct ceph_locker *lockers; 3411 + u32 num_lockers; 3412 + int ret; 3413 + 3414 + for (;;) { 3415 + ret = rbd_lock(rbd_dev); 3416 + if (ret != -EBUSY) 3417 + return ret; 3418 + 3419 + /* determine if the current lock holder is still alive */ 3420 + ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers); 3421 + if (ret) 3422 + return ret; 3423 + 3424 + if (num_lockers == 0) 3425 + goto again; 3426 + 3427 + ret = find_watcher(rbd_dev, lockers); 3428 + if (ret) { 3429 + if (ret > 0) 3430 + ret = 0; /* have to request lock */ 3431 + goto out; 3432 + } 3433 + 3434 + rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", 3435 + ENTITY_NAME(lockers[0].id.name)); 3436 + 3437 + ret = ceph_monc_blacklist_add(&client->monc, 3438 + &lockers[0].info.addr); 3439 + if (ret) { 3440 + rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d", 3441 + ENTITY_NAME(lockers[0].id.name), ret); 3442 + goto out; 3443 + } 3444 + 3445 + ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid, 3446 + &rbd_dev->header_oloc, RBD_LOCK_NAME, 3447 + lockers[0].id.cookie, 3448 + &lockers[0].id.name); 3449 + if (ret && ret != -ENOENT) 3450 + goto out; 3451 + 3452 + again: 3453 + ceph_free_lockers(lockers, num_lockers); 3454 + } 3455 + 3456 + out: 3457 + ceph_free_lockers(lockers, num_lockers); 3458 + return ret; 3459 + } 3460 + 3461 + /* 3462 + * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED 3463 + */ 3464 + static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev, 3465 + int *pret) 3466 + { 3467 + enum rbd_lock_state lock_state; 3468 + 3469 + down_read(&rbd_dev->lock_rwsem); 3470 + dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, 3471 + rbd_dev->lock_state); 3472 + if (__rbd_is_lock_owner(rbd_dev)) { 3473 + lock_state = rbd_dev->lock_state; 3474 + up_read(&rbd_dev->lock_rwsem); 3475 + return lock_state; 3476 + } 3477 + 3478 + up_read(&rbd_dev->lock_rwsem); 3479 + down_write(&rbd_dev->lock_rwsem); 3480 + dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, 3481 + rbd_dev->lock_state); 3482 + if (!__rbd_is_lock_owner(rbd_dev)) { 3483 + *pret = rbd_try_lock(rbd_dev); 3484 + if (*pret) 3485 + rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret); 3486 + } 3487 + 3488 + lock_state = rbd_dev->lock_state; 3489 + up_write(&rbd_dev->lock_rwsem); 3490 + return lock_state; 3491 + } 3492 + 3493 + static void rbd_acquire_lock(struct work_struct *work) 3494 + { 3495 + struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 3496 + struct rbd_device, lock_dwork); 3497 + enum rbd_lock_state lock_state; 3498 + int ret; 3499 + 3500 + dout("%s rbd_dev %p\n", __func__, rbd_dev); 3501 + again: 3502 + lock_state = rbd_try_acquire_lock(rbd_dev, &ret); 3503 + if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) { 3504 + if (lock_state == RBD_LOCK_STATE_LOCKED) 3505 + wake_requests(rbd_dev, true); 3506 + dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__, 3507 + rbd_dev, lock_state, ret); 3508 + return; 3509 + } 3510 + 3511 + ret = rbd_request_lock(rbd_dev); 3512 + if (ret == -ETIMEDOUT) { 3513 + goto again; /* treat this as a dead client */ 3514 + } else if (ret < 0) { 3515 + rbd_warn(rbd_dev, "error requesting lock: %d", ret); 3516 + mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 3517 + RBD_RETRY_DELAY); 3518 + } else { 3519 + /* 3520 + * lock owner acked, but resend if we don't see them 3521 + * release the lock 3522 + */ 3523 + dout("%s rbd_dev %p requeueing lock_dwork\n", __func__, 3524 + rbd_dev); 3525 + mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 3526 + msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC)); 3527 + } 3528 + } 3529 + 3530 + /* 3531 + * lock_rwsem must be held for write 3532 + */ 3533 + static bool rbd_release_lock(struct rbd_device *rbd_dev) 3534 + { 3535 + dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, 3536 + rbd_dev->lock_state); 3537 + if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) 3538 + return false; 3539 + 3540 + rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; 3541 + downgrade_write(&rbd_dev->lock_rwsem); 3542 + /* 3543 + * Ensure that all in-flight IO is flushed. 3544 + * 3545 + * FIXME: ceph_osdc_sync() flushes the entire OSD client, which 3546 + * may be shared with other devices. 3547 + */ 3548 + ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc); 3549 + up_read(&rbd_dev->lock_rwsem); 3550 + 3551 + down_write(&rbd_dev->lock_rwsem); 3552 + dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, 3553 + rbd_dev->lock_state); 3554 + if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) 3555 + return false; 3556 + 3557 + if (!rbd_unlock(rbd_dev)) 3558 + /* 3559 + * Give others a chance to grab the lock - we would re-acquire 3560 + * almost immediately if we got new IO during ceph_osdc_sync() 3561 + * otherwise. We need to ack our own notifications, so this 3562 + * lock_dwork will be requeued from rbd_wait_state_locked() 3563 + * after wake_requests() in rbd_handle_released_lock(). 3564 + */ 3565 + cancel_delayed_work(&rbd_dev->lock_dwork); 3566 + 3567 + return true; 3568 + } 3569 + 3570 + static void rbd_release_lock_work(struct work_struct *work) 3571 + { 3572 + struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 3573 + unlock_work); 3574 + 3575 + down_write(&rbd_dev->lock_rwsem); 3576 + rbd_release_lock(rbd_dev); 3577 + up_write(&rbd_dev->lock_rwsem); 3578 + } 3579 + 3580 + static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, 3581 + void **p) 3582 + { 3583 + struct rbd_client_id cid = { 0 }; 3584 + 3585 + if (struct_v >= 2) { 3586 + cid.gid = ceph_decode_64(p); 3587 + cid.handle = ceph_decode_64(p); 3588 + } 3589 + 3590 + dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3591 + cid.handle); 3592 + if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { 3593 + down_write(&rbd_dev->lock_rwsem); 3594 + if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { 3595 + /* 3596 + * we already know that the remote client is 3597 + * the owner 3598 + */ 3599 + up_write(&rbd_dev->lock_rwsem); 3600 + return; 3601 + } 3602 + 3603 + rbd_set_owner_cid(rbd_dev, &cid); 3604 + downgrade_write(&rbd_dev->lock_rwsem); 3605 + } else { 3606 + down_read(&rbd_dev->lock_rwsem); 3607 + } 3608 + 3609 + if (!__rbd_is_lock_owner(rbd_dev)) 3610 + wake_requests(rbd_dev, false); 3611 + up_read(&rbd_dev->lock_rwsem); 3612 + } 3613 + 3614 + static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v, 3615 + void **p) 3616 + { 3617 + struct rbd_client_id cid = { 0 }; 3618 + 3619 + if (struct_v >= 2) { 3620 + cid.gid = ceph_decode_64(p); 3621 + cid.handle = ceph_decode_64(p); 3622 + } 3623 + 3624 + dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3625 + cid.handle); 3626 + if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { 3627 + down_write(&rbd_dev->lock_rwsem); 3628 + if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { 3629 + dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n", 3630 + __func__, rbd_dev, cid.gid, cid.handle, 3631 + rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle); 3632 + up_write(&rbd_dev->lock_rwsem); 3633 + return; 3634 + } 3635 + 3636 + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 3637 + downgrade_write(&rbd_dev->lock_rwsem); 3638 + } else { 3639 + down_read(&rbd_dev->lock_rwsem); 3640 + } 3641 + 3642 + if (!__rbd_is_lock_owner(rbd_dev)) 3643 + wake_requests(rbd_dev, false); 3644 + up_read(&rbd_dev->lock_rwsem); 3645 + } 3646 + 3647 + static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, 3648 + void **p) 3649 + { 3650 + struct rbd_client_id my_cid = rbd_get_cid(rbd_dev); 3651 + struct rbd_client_id cid = { 0 }; 3652 + bool need_to_send; 3653 + 3654 + if (struct_v >= 2) { 3655 + cid.gid = ceph_decode_64(p); 3656 + cid.handle = ceph_decode_64(p); 3657 + } 3658 + 3659 + dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3660 + cid.handle); 3661 + if (rbd_cid_equal(&cid, &my_cid)) 3662 + return false; 3663 + 3664 + down_read(&rbd_dev->lock_rwsem); 3665 + need_to_send = __rbd_is_lock_owner(rbd_dev); 3666 + if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) { 3667 + if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) { 3668 + dout("%s rbd_dev %p queueing unlock_work\n", __func__, 3669 + rbd_dev); 3670 + queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work); 3671 + } 3672 + } 3673 + up_read(&rbd_dev->lock_rwsem); 3674 + return need_to_send; 3675 + } 3676 + 3677 + static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, 3678 + u64 notify_id, u64 cookie, s32 *result) 3679 + { 3680 + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3681 + int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN; 3682 + char buf[buf_size]; 3683 + int ret; 3684 + 3685 + if (result) { 3686 + void *p = buf; 3687 + 3688 + /* encode ResponseMessage */ 3689 + ceph_start_encoding(&p, 1, 1, 3690 + buf_size - CEPH_ENCODING_START_BLK_LEN); 3691 + ceph_encode_32(&p, *result); 3692 + } else { 3693 + buf_size = 0; 3694 + } 3695 + 3696 + ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid, 3697 + &rbd_dev->header_oloc, notify_id, cookie, 3698 + buf, buf_size); 3699 + if (ret) 3700 + rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret); 3701 + } 3702 + 3703 + static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id, 3704 + u64 cookie) 3705 + { 3706 + dout("%s rbd_dev %p\n", __func__, rbd_dev); 3707 + __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL); 3708 + } 3709 + 3710 + static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev, 3711 + u64 notify_id, u64 cookie, s32 result) 3712 + { 3713 + dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); 3714 + __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result); 3715 + } 3716 + 3146 3717 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, 3147 3718 u64 notifier_id, void *data, size_t data_len) 3148 3719 { 3149 3720 struct rbd_device *rbd_dev = arg; 3150 - struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3721 + void *p = data; 3722 + void *const end = p + data_len; 3723 + u8 struct_v; 3724 + u32 len; 3725 + u32 notify_op; 3151 3726 int ret; 3152 3727 3153 - dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev, 3154 - cookie, notify_id); 3728 + dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n", 3729 + __func__, rbd_dev, cookie, notify_id, data_len); 3730 + if (data_len) { 3731 + ret = ceph_start_decoding(&p, end, 1, "NotifyMessage", 3732 + &struct_v, &len); 3733 + if (ret) { 3734 + rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d", 3735 + ret); 3736 + return; 3737 + } 3155 3738 3156 - /* 3157 - * Until adequate refresh error handling is in place, there is 3158 - * not much we can do here, except warn. 3159 - * 3160 - * See http://tracker.ceph.com/issues/5040 3161 - */ 3162 - ret = rbd_dev_refresh(rbd_dev); 3163 - if (ret) 3164 - rbd_warn(rbd_dev, "refresh failed: %d", ret); 3739 + notify_op = ceph_decode_32(&p); 3740 + } else { 3741 + /* legacy notification for header updates */ 3742 + notify_op = RBD_NOTIFY_OP_HEADER_UPDATE; 3743 + len = 0; 3744 + } 3165 3745 3166 - ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid, 3167 - &rbd_dev->header_oloc, notify_id, cookie, 3168 - NULL, 0); 3169 - if (ret) 3170 - rbd_warn(rbd_dev, "notify_ack ret %d", ret); 3746 + dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op); 3747 + switch (notify_op) { 3748 + case RBD_NOTIFY_OP_ACQUIRED_LOCK: 3749 + rbd_handle_acquired_lock(rbd_dev, struct_v, &p); 3750 + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3751 + break; 3752 + case RBD_NOTIFY_OP_RELEASED_LOCK: 3753 + rbd_handle_released_lock(rbd_dev, struct_v, &p); 3754 + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3755 + break; 3756 + case RBD_NOTIFY_OP_REQUEST_LOCK: 3757 + if (rbd_handle_request_lock(rbd_dev, struct_v, &p)) 3758 + /* 3759 + * send ResponseMessage(0) back so the client 3760 + * can detect a missing owner 3761 + */ 3762 + rbd_acknowledge_notify_result(rbd_dev, notify_id, 3763 + cookie, 0); 3764 + else 3765 + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3766 + break; 3767 + case RBD_NOTIFY_OP_HEADER_UPDATE: 3768 + ret = rbd_dev_refresh(rbd_dev); 3769 + if (ret) 3770 + rbd_warn(rbd_dev, "refresh failed: %d", ret); 3771 + 3772 + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3773 + break; 3774 + default: 3775 + if (rbd_is_lock_owner(rbd_dev)) 3776 + rbd_acknowledge_notify_result(rbd_dev, notify_id, 3777 + cookie, -EOPNOTSUPP); 3778 + else 3779 + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3780 + break; 3781 + } 3171 3782 } 3172 3783 3173 3784 static void __rbd_unregister_watch(struct rbd_device *rbd_dev); ··· 3836 3129 struct rbd_device *rbd_dev = arg; 3837 3130 3838 3131 rbd_warn(rbd_dev, "encountered watch error: %d", err); 3132 + 3133 + down_write(&rbd_dev->lock_rwsem); 3134 + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 3135 + up_write(&rbd_dev->lock_rwsem); 3839 3136 3840 3137 mutex_lock(&rbd_dev->watch_mutex); 3841 3138 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) { ··· 3913 3202 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3914 3203 3915 3204 cancel_delayed_work_sync(&rbd_dev->watch_dwork); 3205 + cancel_work_sync(&rbd_dev->acquired_lock_work); 3206 + cancel_work_sync(&rbd_dev->released_lock_work); 3207 + cancel_delayed_work_sync(&rbd_dev->lock_dwork); 3208 + cancel_work_sync(&rbd_dev->unlock_work); 3916 3209 } 3917 3210 3918 3211 static void rbd_unregister_watch(struct rbd_device *rbd_dev) 3919 3212 { 3213 + WARN_ON(waitqueue_active(&rbd_dev->lock_waitq)); 3920 3214 cancel_tasks_sync(rbd_dev); 3921 3215 3922 3216 mutex_lock(&rbd_dev->watch_mutex); ··· 3937 3221 { 3938 3222 struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 3939 3223 struct rbd_device, watch_dwork); 3224 + bool was_lock_owner = false; 3940 3225 int ret; 3941 3226 3942 3227 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3228 + 3229 + down_write(&rbd_dev->lock_rwsem); 3230 + if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) 3231 + was_lock_owner = rbd_release_lock(rbd_dev); 3943 3232 3944 3233 mutex_lock(&rbd_dev->watch_mutex); 3945 3234 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) ··· 3968 3247 if (ret) 3969 3248 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); 3970 3249 3250 + if (was_lock_owner) { 3251 + ret = rbd_try_lock(rbd_dev); 3252 + if (ret) 3253 + rbd_warn(rbd_dev, "reregisteration lock failed: %d", 3254 + ret); 3255 + } 3256 + 3257 + up_write(&rbd_dev->lock_rwsem); 3258 + wake_requests(rbd_dev, true); 3971 3259 return; 3972 3260 3973 3261 fail_unlock: 3974 3262 mutex_unlock(&rbd_dev->watch_mutex); 3263 + up_write(&rbd_dev->lock_rwsem); 3975 3264 } 3976 3265 3977 3266 /* ··· 4071 3340 return ret; 4072 3341 } 4073 3342 3343 + /* 3344 + * lock_rwsem must be held for read 3345 + */ 3346 + static void rbd_wait_state_locked(struct rbd_device *rbd_dev) 3347 + { 3348 + DEFINE_WAIT(wait); 3349 + 3350 + do { 3351 + /* 3352 + * Note the use of mod_delayed_work() in rbd_acquire_lock() 3353 + * and cancel_delayed_work() in wake_requests(). 3354 + */ 3355 + dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); 3356 + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); 3357 + prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait, 3358 + TASK_UNINTERRUPTIBLE); 3359 + up_read(&rbd_dev->lock_rwsem); 3360 + schedule(); 3361 + down_read(&rbd_dev->lock_rwsem); 3362 + } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); 3363 + finish_wait(&rbd_dev->lock_waitq, &wait); 3364 + } 3365 + 4074 3366 static void rbd_queue_workfn(struct work_struct *work) 4075 3367 { 4076 3368 struct request *rq = blk_mq_rq_from_pdu(work); ··· 4104 3350 u64 length = blk_rq_bytes(rq); 4105 3351 enum obj_operation_type op_type; 4106 3352 u64 mapping_size; 3353 + bool must_be_locked = false; 4107 3354 int result; 4108 3355 4109 3356 if (rq->cmd_type != REQ_TYPE_FS) { ··· 4166 3411 if (op_type != OBJ_OP_READ) { 4167 3412 snapc = rbd_dev->header.snapc; 4168 3413 ceph_get_snap_context(snapc); 3414 + must_be_locked = rbd_is_lock_supported(rbd_dev); 4169 3415 } 4170 3416 up_read(&rbd_dev->header_rwsem); 4171 3417 ··· 4177 3421 goto err_rq; 4178 3422 } 4179 3423 3424 + if (must_be_locked) { 3425 + down_read(&rbd_dev->lock_rwsem); 3426 + if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) 3427 + rbd_wait_state_locked(rbd_dev); 3428 + } 3429 + 4180 3430 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type, 4181 3431 snapc); 4182 3432 if (!img_request) { 4183 3433 result = -ENOMEM; 4184 - goto err_rq; 3434 + goto err_unlock; 4185 3435 } 4186 3436 img_request->rq = rq; 4187 3437 snapc = NULL; /* img_request consumes a ref */ ··· 4205 3443 if (result) 4206 3444 goto err_img_request; 4207 3445 3446 + if (must_be_locked) 3447 + up_read(&rbd_dev->lock_rwsem); 4208 3448 return; 4209 3449 4210 3450 err_img_request: 4211 3451 rbd_img_request_put(img_request); 3452 + err_unlock: 3453 + if (must_be_locked) 3454 + up_read(&rbd_dev->lock_rwsem); 4212 3455 err_rq: 4213 3456 if (result) 4214 3457 rbd_warn(rbd_dev, "%s %llx at %llx result %d", ··· 4787 4020 static void rbd_dev_free(struct rbd_device *rbd_dev) 4788 4021 { 4789 4022 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED); 4023 + WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED); 4790 4024 4791 4025 ceph_oid_destroy(&rbd_dev->header_oid); 4792 4026 ceph_oloc_destroy(&rbd_dev->header_oloc); ··· 4838 4070 mutex_init(&rbd_dev->watch_mutex); 4839 4071 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; 4840 4072 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch); 4073 + 4074 + init_rwsem(&rbd_dev->lock_rwsem); 4075 + rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; 4076 + INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock); 4077 + INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock); 4078 + INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); 4079 + INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); 4080 + init_waitqueue_head(&rbd_dev->lock_waitq); 4841 4081 4842 4082 rbd_dev->dev.bus = &rbd_bus_type; 4843 4083 rbd_dev->dev.type = &rbd_device_type; ··· 6329 5553 if (ret < 0 || already) 6330 5554 return ret; 6331 5555 5556 + down_write(&rbd_dev->lock_rwsem); 5557 + if (__rbd_is_lock_owner(rbd_dev)) 5558 + rbd_unlock(rbd_dev); 5559 + up_write(&rbd_dev->lock_rwsem); 6332 5560 rbd_unregister_watch(rbd_dev); 6333 5561 6334 5562 /*
+11
drivers/block/rbd_types.h
··· 28 28 #define RBD_DATA_PREFIX "rbd_data." 29 29 #define RBD_ID_PREFIX "rbd_id." 30 30 31 + #define RBD_LOCK_NAME "rbd_lock" 32 + #define RBD_LOCK_TAG "internal" 33 + #define RBD_LOCK_COOKIE_PREFIX "auto" 34 + 35 + enum rbd_notify_op { 36 + RBD_NOTIFY_OP_ACQUIRED_LOCK = 0, 37 + RBD_NOTIFY_OP_RELEASED_LOCK = 1, 38 + RBD_NOTIFY_OP_REQUEST_LOCK = 2, 39 + RBD_NOTIFY_OP_HEADER_UPDATE = 3, 40 + }; 41 + 31 42 /* 32 43 * For format version 1, rbd image 'foo' consists of objects 33 44 * foo.rbd - image metadata
+1
net/ceph/ceph_strings.c
··· 15 15 default: return "unknown"; 16 16 } 17 17 } 18 + EXPORT_SYMBOL(ceph_entity_type_name); 18 19 19 20 const char *ceph_osd_op_name(int op) 20 21 {