Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

libceph: add lingering request and watch/notify event framework

Lingering requests are requests that are sent to the OSD normally but
tracked also after we get a successful request. This keeps the OSD
connection open and resends the original request if the object moves to
another OSD. The OSD can then send notification messages back to us
if another client initiates a notify.

This framework will be used by RBD so that the client gets notification
when a snapshot is created by another node or tool.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>

authored by

Yehuda Sadeh and committed by
Sage Weil
a40c4f10 55b00bae

+426 -12
+52
include/linux/ceph/osd_client.h
··· 32 32 struct rb_node o_node; 33 33 struct ceph_connection o_con; 34 34 struct list_head o_requests; 35 + struct list_head o_linger_requests; 35 36 struct list_head o_osd_lru; 36 37 struct ceph_authorizer *o_authorizer; 37 38 void *o_authorizer_buf, *o_authorizer_reply_buf; ··· 48 47 struct rb_node r_node; 49 48 struct list_head r_req_lru_item; 50 49 struct list_head r_osd_item; 50 + struct list_head r_linger_item; 51 + struct list_head r_linger_osd; 51 52 struct ceph_osd *r_osd; 52 53 struct ceph_pg r_pgid; 53 54 int r_pg_osds[CEPH_PG_MAX_SIZE]; ··· 62 59 int r_flags; /* any additional flags for the osd */ 63 60 u32 r_sent; /* >0 if r_request is sending/sent */ 64 61 int r_got_reply; 62 + int r_linger; 65 63 66 64 struct ceph_osd_client *r_osdc; 67 65 struct kref r_kref; ··· 93 89 struct ceph_pagelist *r_trail; /* trailing part of the data */ 94 90 }; 95 91 92 + struct ceph_osd_event { 93 + u64 cookie; 94 + int one_shot; 95 + struct ceph_osd_client *osdc; 96 + void (*cb)(u64, u64, u8, void *); 97 + void *data; 98 + struct rb_node node; 99 + struct list_head osd_node; 100 + struct kref kref; 101 + struct completion completion; 102 + }; 103 + 104 + struct ceph_osd_event_work { 105 + struct work_struct work; 106 + struct ceph_osd_event *event; 107 + u64 ver; 108 + u64 notify_id; 109 + u8 opcode; 110 + }; 111 + 96 112 struct ceph_osd_client { 97 113 struct ceph_client *client; 98 114 ··· 130 106 struct list_head req_lru; /* in-flight lru */ 131 107 struct list_head req_unsent; /* unsent/need-resend queue */ 132 108 struct list_head req_notarget; /* map to no osd */ 109 + struct list_head req_linger; /* lingering requests */ 133 110 int num_requests; 134 111 struct delayed_work timeout_work; 135 112 struct delayed_work osds_timeout_work; ··· 142 117 143 118 struct ceph_msgpool msgpool_op; 144 119 struct ceph_msgpool msgpool_op_reply; 120 + 121 + spinlock_t event_lock; 122 + struct rb_root event_tree; 123 + u64 event_count; 124 + 125 + struct workqueue_struct *notify_wq; 145 126 }; 146 127 147 128 struct ceph_osd_req_op { ··· 182 151 struct { 183 152 u64 snapid; 184 153 } snap; 154 + struct { 155 + u64 cookie; 156 + u64 ver; 157 + __u8 flag; 158 + u32 prot_ver; 159 + u32 timeout; 160 + } watch; 185 161 }; 186 162 u32 payload_len; 187 163 }; ··· 237 199 bool use_mempool, int num_reply, 238 200 int page_align); 239 201 202 + extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 203 + struct ceph_osd_request *req); 204 + extern void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, 205 + struct ceph_osd_request *req); 206 + 240 207 static inline void ceph_osdc_get_request(struct ceph_osd_request *req) 241 208 { 242 209 kref_get(&req->r_kref); ··· 277 234 struct page **pages, int nr_pages, 278 235 int flags, int do_sync, bool nofail); 279 236 237 + /* watch/notify events */ 238 + extern int ceph_osdc_create_event(struct ceph_osd_client *osdc, 239 + void (*event_cb)(u64, u64, u8, void *), 240 + int one_shot, void *data, 241 + struct ceph_osd_event **pevent); 242 + extern void ceph_osdc_cancel_event(struct ceph_osd_event *event); 243 + extern int ceph_osdc_wait_event(struct ceph_osd_event *event, 244 + unsigned long timeout); 245 + extern void ceph_osdc_put_event(struct ceph_osd_event *event); 280 246 #endif 281 247
+1
net/ceph/ceph_common.c
··· 62 62 case CEPH_MSG_OSD_MAP: return "osd_map"; 63 63 case CEPH_MSG_OSD_OP: return "osd_op"; 64 64 case CEPH_MSG_OSD_OPREPLY: return "osd_opreply"; 65 + case CEPH_MSG_WATCH_NOTIFY: return "watch_notify"; 65 66 default: return "unknown"; 66 67 } 67 68 }
+373 -12
net/ceph/osd_client.c
··· 25 25 26 26 static void send_queued(struct ceph_osd_client *osdc); 27 27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 28 + static void __register_request(struct ceph_osd_client *osdc, 29 + struct ceph_osd_request *req); 30 + static void __unregister_linger_request(struct ceph_osd_client *osdc, 31 + struct ceph_osd_request *req); 32 + static int __send_request(struct ceph_osd_client *osdc, 33 + struct ceph_osd_request *req); 28 34 29 35 static int op_needs_trail(int op) 30 36 { ··· 39 33 case CEPH_OSD_OP_SETXATTR: 40 34 case CEPH_OSD_OP_CMPXATTR: 41 35 case CEPH_OSD_OP_CALL: 36 + case CEPH_OSD_OP_NOTIFY: 42 37 return 1; 43 38 default: 44 39 return 0; ··· 215 208 init_completion(&req->r_completion); 216 209 init_completion(&req->r_safe_completion); 217 210 INIT_LIST_HEAD(&req->r_unsafe_item); 211 + INIT_LIST_HEAD(&req->r_linger_item); 212 + INIT_LIST_HEAD(&req->r_linger_osd); 218 213 req->r_flags = flags; 219 214 220 215 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); ··· 322 313 dst->snap.snapid = cpu_to_le64(src->snap.snapid); 323 314 break; 324 315 case CEPH_OSD_OP_STARTSYNC: 316 + break; 317 + case CEPH_OSD_OP_NOTIFY: 318 + { 319 + __le32 prot_ver = cpu_to_le32(src->watch.prot_ver); 320 + __le32 timeout = cpu_to_le32(src->watch.timeout); 321 + 322 + BUG_ON(!req->r_trail); 323 + 324 + ceph_pagelist_append(req->r_trail, 325 + &prot_ver, sizeof(prot_ver)); 326 + ceph_pagelist_append(req->r_trail, 327 + &timeout, sizeof(timeout)); 328 + } 329 + case CEPH_OSD_OP_NOTIFY_ACK: 330 + case CEPH_OSD_OP_WATCH: 331 + dst->watch.cookie = cpu_to_le64(src->watch.cookie); 332 + dst->watch.ver = cpu_to_le64(src->watch.ver); 333 + dst->watch.flag = src->watch.flag; 325 334 break; 326 335 default: 327 336 pr_err("unrecognized osd opcode %d\n", dst->op); ··· 561 534 static void __kick_osd_requests(struct ceph_osd_client *osdc, 562 535 struct ceph_osd *osd) 563 536 { 564 - struct ceph_osd_request *req; 537 + struct ceph_osd_request *req, *nreq; 565 538 int err; 566 539 567 540 dout("__kick_osd_requests osd%d\n", osd->o_osd); ··· 573 546 list_move(&req->r_req_lru_item, &osdc->req_unsent); 574 547 dout("requeued %p tid %llu osd%d\n", req, req->r_tid, 575 548 osd->o_osd); 576 - req->r_flags |= CEPH_OSD_FLAG_RETRY; 549 + if (!req->r_linger) 550 + req->r_flags |= CEPH_OSD_FLAG_RETRY; 551 + } 552 + 553 + list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 554 + r_linger_osd) { 555 + __unregister_linger_request(osdc, req); 556 + __register_request(osdc, req); 557 + list_move(&req->r_req_lru_item, &osdc->req_unsent); 558 + dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, 559 + osd->o_osd); 577 560 } 578 561 } 579 562 ··· 627 590 atomic_set(&osd->o_ref, 1); 628 591 osd->o_osdc = osdc; 629 592 INIT_LIST_HEAD(&osd->o_requests); 593 + INIT_LIST_HEAD(&osd->o_linger_requests); 630 594 INIT_LIST_HEAD(&osd->o_osd_lru); 631 595 osd->o_incarnation = 1; 632 596 ··· 717 679 int ret = 0; 718 680 719 681 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 720 - if (list_empty(&osd->o_requests)) { 682 + if (list_empty(&osd->o_requests) && 683 + list_empty(&osd->o_linger_requests)) { 721 684 __remove_osd(osdc, osd); 722 685 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 723 686 &osd->o_con.peer_addr, ··· 791 752 * Register request, assign tid. If this is the first request, set up 792 753 * the timeout event. 793 754 */ 794 - static void register_request(struct ceph_osd_client *osdc, 795 - struct ceph_osd_request *req) 755 + static void __register_request(struct ceph_osd_client *osdc, 756 + struct ceph_osd_request *req) 796 757 { 797 - mutex_lock(&osdc->request_mutex); 798 758 req->r_tid = ++osdc->last_tid; 799 759 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 800 760 INIT_LIST_HEAD(&req->r_req_lru_item); ··· 807 769 dout(" first request, scheduling timeout\n"); 808 770 __schedule_osd_timeout(osdc); 809 771 } 772 + } 773 + 774 + static void register_request(struct ceph_osd_client *osdc, 775 + struct ceph_osd_request *req) 776 + { 777 + mutex_lock(&osdc->request_mutex); 778 + __register_request(osdc, req); 810 779 mutex_unlock(&osdc->request_mutex); 811 780 } 812 781 ··· 832 787 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 833 788 834 789 list_del_init(&req->r_osd_item); 835 - if (list_empty(&req->r_osd->o_requests)) 790 + if (list_empty(&req->r_osd->o_requests) && 791 + list_empty(&req->r_osd->o_linger_requests)) { 792 + dout("moving osd to %p lru\n", req->r_osd); 836 793 __move_osd_to_lru(osdc, req->r_osd); 837 - req->r_osd = NULL; 794 + } 795 + if (list_empty(&req->r_osd_item) && 796 + list_empty(&req->r_linger_item)) 797 + req->r_osd = NULL; 838 798 } 839 799 840 800 ceph_osdc_put_request(req); ··· 861 811 req->r_sent = 0; 862 812 } 863 813 } 814 + 815 + static void __register_linger_request(struct ceph_osd_client *osdc, 816 + struct ceph_osd_request *req) 817 + { 818 + dout("__register_linger_request %p\n", req); 819 + list_add_tail(&req->r_linger_item, &osdc->req_linger); 820 + list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests); 821 + } 822 + 823 + static void __unregister_linger_request(struct ceph_osd_client *osdc, 824 + struct ceph_osd_request *req) 825 + { 826 + dout("__unregister_linger_request %p\n", req); 827 + if (req->r_osd) { 828 + list_del_init(&req->r_linger_item); 829 + list_del_init(&req->r_linger_osd); 830 + 831 + if (list_empty(&req->r_osd->o_requests) && 832 + list_empty(&req->r_osd->o_linger_requests)) { 833 + dout("moving osd to %p lru\n", req->r_osd); 834 + __move_osd_to_lru(osdc, req->r_osd); 835 + } 836 + req->r_osd = NULL; 837 + } 838 + } 839 + 840 + void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, 841 + struct ceph_osd_request *req) 842 + { 843 + mutex_lock(&osdc->request_mutex); 844 + if (req->r_linger) { 845 + __unregister_linger_request(osdc, req); 846 + ceph_osdc_put_request(req); 847 + } 848 + mutex_unlock(&osdc->request_mutex); 849 + } 850 + EXPORT_SYMBOL(ceph_osdc_unregister_linger_request); 851 + 852 + void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 853 + struct ceph_osd_request *req) 854 + { 855 + if (!req->r_linger) { 856 + dout("set_request_linger %p\n", req); 857 + req->r_linger = 1; 858 + /* 859 + * caller is now responsible for calling 860 + * unregister_linger_request 861 + */ 862 + ceph_osdc_get_request(req); 863 + } 864 + } 865 + EXPORT_SYMBOL(ceph_osdc_set_request_linger); 864 866 865 867 /* 866 868 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct ··· 1060 958 osdc->client->options->osd_keepalive_timeout * HZ; 1061 959 unsigned long last_stamp = 0; 1062 960 struct list_head slow_osds; 1063 - 1064 961 dout("timeout\n"); 1065 962 down_read(&osdc->map_sem); 1066 963 ··· 1161 1060 numops * sizeof(struct ceph_osd_op)) 1162 1061 goto bad; 1163 1062 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 1164 - 1165 1063 /* lookup */ 1166 1064 mutex_lock(&osdc->request_mutex); 1167 1065 req = __lookup_request(osdc, tid); ··· 1204 1104 1205 1105 dout("handle_reply tid %llu flags %d\n", tid, flags); 1206 1106 1107 + if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 1108 + __register_linger_request(osdc, req); 1109 + 1207 1110 /* either this is a read, or we got the safe response */ 1208 1111 if (result < 0 || 1209 1112 (flags & CEPH_OSD_FLAG_ONDISK) || ··· 1227 1124 } 1228 1125 1229 1126 done: 1127 + dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1230 1128 ceph_osdc_put_request(req); 1231 1129 return; 1232 1130 ··· 1263 1159 */ 1264 1160 static void kick_requests(struct ceph_osd_client *osdc) 1265 1161 { 1266 - struct ceph_osd_request *req; 1162 + struct ceph_osd_request *req, *nreq; 1267 1163 struct rb_node *p; 1268 1164 int needmap = 0; 1269 1165 int err; ··· 1281 1177 } else if (err > 0) { 1282 1178 dout("%p tid %llu requeued on osd%d\n", req, req->r_tid, 1283 1179 req->r_osd ? req->r_osd->o_osd : -1); 1284 - req->r_flags |= CEPH_OSD_FLAG_RETRY; 1180 + if (!req->r_linger) 1181 + req->r_flags |= CEPH_OSD_FLAG_RETRY; 1285 1182 } 1183 + } 1184 + 1185 + list_for_each_entry_safe(req, nreq, &osdc->req_linger, 1186 + r_linger_item) { 1187 + dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1188 + 1189 + err = __map_request(osdc, req); 1190 + if (err == 0) 1191 + continue; /* no change and no osd was specified */ 1192 + if (err < 0) 1193 + continue; /* hrm! */ 1194 + if (req->r_osd == NULL) { 1195 + dout("tid %llu maps to no valid osd\n", req->r_tid); 1196 + needmap++; /* request a newer map */ 1197 + continue; 1198 + } 1199 + 1200 + dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, 1201 + req->r_osd ? req->r_osd->o_osd : -1); 1202 + __unregister_linger_request(osdc, req); 1203 + __register_request(osdc, req); 1286 1204 } 1287 1205 mutex_unlock(&osdc->request_mutex); 1288 1206 ··· 1426 1300 up_write(&osdc->map_sem); 1427 1301 return; 1428 1302 } 1303 + 1304 + /* 1305 + * watch/notify callback event infrastructure 1306 + * 1307 + * These callbacks are used both for watch and notify operations. 1308 + */ 1309 + static void __release_event(struct kref *kref) 1310 + { 1311 + struct ceph_osd_event *event = 1312 + container_of(kref, struct ceph_osd_event, kref); 1313 + 1314 + dout("__release_event %p\n", event); 1315 + kfree(event); 1316 + } 1317 + 1318 + static void get_event(struct ceph_osd_event *event) 1319 + { 1320 + kref_get(&event->kref); 1321 + } 1322 + 1323 + void ceph_osdc_put_event(struct ceph_osd_event *event) 1324 + { 1325 + kref_put(&event->kref, __release_event); 1326 + } 1327 + EXPORT_SYMBOL(ceph_osdc_put_event); 1328 + 1329 + static void __insert_event(struct ceph_osd_client *osdc, 1330 + struct ceph_osd_event *new) 1331 + { 1332 + struct rb_node **p = &osdc->event_tree.rb_node; 1333 + struct rb_node *parent = NULL; 1334 + struct ceph_osd_event *event = NULL; 1335 + 1336 + while (*p) { 1337 + parent = *p; 1338 + event = rb_entry(parent, struct ceph_osd_event, node); 1339 + if (new->cookie < event->cookie) 1340 + p = &(*p)->rb_left; 1341 + else if (new->cookie > event->cookie) 1342 + p = &(*p)->rb_right; 1343 + else 1344 + BUG(); 1345 + } 1346 + 1347 + rb_link_node(&new->node, parent, p); 1348 + rb_insert_color(&new->node, &osdc->event_tree); 1349 + } 1350 + 1351 + static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, 1352 + u64 cookie) 1353 + { 1354 + struct rb_node **p = &osdc->event_tree.rb_node; 1355 + struct rb_node *parent = NULL; 1356 + struct ceph_osd_event *event = NULL; 1357 + 1358 + while (*p) { 1359 + parent = *p; 1360 + event = rb_entry(parent, struct ceph_osd_event, node); 1361 + if (cookie < event->cookie) 1362 + p = &(*p)->rb_left; 1363 + else if (cookie > event->cookie) 1364 + p = &(*p)->rb_right; 1365 + else 1366 + return event; 1367 + } 1368 + return NULL; 1369 + } 1370 + 1371 + static void __remove_event(struct ceph_osd_event *event) 1372 + { 1373 + struct ceph_osd_client *osdc = event->osdc; 1374 + 1375 + if (!RB_EMPTY_NODE(&event->node)) { 1376 + dout("__remove_event removed %p\n", event); 1377 + rb_erase(&event->node, &osdc->event_tree); 1378 + ceph_osdc_put_event(event); 1379 + } else { 1380 + dout("__remove_event didn't remove %p\n", event); 1381 + } 1382 + } 1383 + 1384 + int ceph_osdc_create_event(struct ceph_osd_client *osdc, 1385 + void (*event_cb)(u64, u64, u8, void *), 1386 + int one_shot, void *data, 1387 + struct ceph_osd_event **pevent) 1388 + { 1389 + struct ceph_osd_event *event; 1390 + 1391 + event = kmalloc(sizeof(*event), GFP_NOIO); 1392 + if (!event) 1393 + return -ENOMEM; 1394 + 1395 + dout("create_event %p\n", event); 1396 + event->cb = event_cb; 1397 + event->one_shot = one_shot; 1398 + event->data = data; 1399 + event->osdc = osdc; 1400 + INIT_LIST_HEAD(&event->osd_node); 1401 + kref_init(&event->kref); /* one ref for us */ 1402 + kref_get(&event->kref); /* one ref for the caller */ 1403 + init_completion(&event->completion); 1404 + 1405 + spin_lock(&osdc->event_lock); 1406 + event->cookie = ++osdc->event_count; 1407 + __insert_event(osdc, event); 1408 + spin_unlock(&osdc->event_lock); 1409 + 1410 + *pevent = event; 1411 + return 0; 1412 + } 1413 + EXPORT_SYMBOL(ceph_osdc_create_event); 1414 + 1415 + void ceph_osdc_cancel_event(struct ceph_osd_event *event) 1416 + { 1417 + struct ceph_osd_client *osdc = event->osdc; 1418 + 1419 + dout("cancel_event %p\n", event); 1420 + spin_lock(&osdc->event_lock); 1421 + __remove_event(event); 1422 + spin_unlock(&osdc->event_lock); 1423 + ceph_osdc_put_event(event); /* caller's */ 1424 + } 1425 + EXPORT_SYMBOL(ceph_osdc_cancel_event); 1426 + 1427 + 1428 + static void do_event_work(struct work_struct *work) 1429 + { 1430 + struct ceph_osd_event_work *event_work = 1431 + container_of(work, struct ceph_osd_event_work, work); 1432 + struct ceph_osd_event *event = event_work->event; 1433 + u64 ver = event_work->ver; 1434 + u64 notify_id = event_work->notify_id; 1435 + u8 opcode = event_work->opcode; 1436 + 1437 + dout("do_event_work completing %p\n", event); 1438 + event->cb(ver, notify_id, opcode, event->data); 1439 + complete(&event->completion); 1440 + dout("do_event_work completed %p\n", event); 1441 + ceph_osdc_put_event(event); 1442 + kfree(event_work); 1443 + } 1444 + 1445 + 1446 + /* 1447 + * Process osd watch notifications 1448 + */ 1449 + void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1450 + { 1451 + void *p, *end; 1452 + u8 proto_ver; 1453 + u64 cookie, ver, notify_id; 1454 + u8 opcode; 1455 + struct ceph_osd_event *event; 1456 + struct ceph_osd_event_work *event_work; 1457 + 1458 + p = msg->front.iov_base; 1459 + end = p + msg->front.iov_len; 1460 + 1461 + ceph_decode_8_safe(&p, end, proto_ver, bad); 1462 + ceph_decode_8_safe(&p, end, opcode, bad); 1463 + ceph_decode_64_safe(&p, end, cookie, bad); 1464 + ceph_decode_64_safe(&p, end, ver, bad); 1465 + ceph_decode_64_safe(&p, end, notify_id, bad); 1466 + 1467 + spin_lock(&osdc->event_lock); 1468 + event = __find_event(osdc, cookie); 1469 + if (event) { 1470 + get_event(event); 1471 + if (event->one_shot) 1472 + __remove_event(event); 1473 + } 1474 + spin_unlock(&osdc->event_lock); 1475 + dout("handle_watch_notify cookie %lld ver %lld event %p\n", 1476 + cookie, ver, event); 1477 + if (event) { 1478 + event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 1479 + INIT_WORK(&event_work->work, do_event_work); 1480 + if (!event_work) { 1481 + dout("ERROR: could not allocate event_work\n"); 1482 + goto done_err; 1483 + } 1484 + event_work->event = event; 1485 + event_work->ver = ver; 1486 + event_work->notify_id = notify_id; 1487 + event_work->opcode = opcode; 1488 + if (!queue_work(osdc->notify_wq, &event_work->work)) { 1489 + dout("WARNING: failed to queue notify event work\n"); 1490 + goto done_err; 1491 + } 1492 + } 1493 + 1494 + return; 1495 + 1496 + done_err: 1497 + complete(&event->completion); 1498 + ceph_osdc_put_event(event); 1499 + return; 1500 + 1501 + bad: 1502 + pr_err("osdc handle_watch_notify corrupt msg\n"); 1503 + return; 1504 + } 1505 + 1506 + int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout) 1507 + { 1508 + int err; 1509 + 1510 + dout("wait_event %p\n", event); 1511 + err = wait_for_completion_interruptible_timeout(&event->completion, 1512 + timeout * HZ); 1513 + ceph_osdc_put_event(event); 1514 + if (err > 0) 1515 + err = 0; 1516 + dout("wait_event %p returns %d\n", event, err); 1517 + return err; 1518 + } 1519 + EXPORT_SYMBOL(ceph_osdc_wait_event); 1429 1520 1430 1521 /* 1431 1522 * Register request, send initial attempt. ··· 1773 1430 INIT_LIST_HEAD(&osdc->req_lru); 1774 1431 INIT_LIST_HEAD(&osdc->req_unsent); 1775 1432 INIT_LIST_HEAD(&osdc->req_notarget); 1433 + INIT_LIST_HEAD(&osdc->req_linger); 1776 1434 osdc->num_requests = 0; 1777 1435 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 1778 1436 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 1437 + spin_lock_init(&osdc->event_lock); 1438 + osdc->event_tree = RB_ROOT; 1439 + osdc->event_count = 0; 1779 1440 1780 1441 schedule_delayed_work(&osdc->osds_timeout_work, 1781 1442 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); ··· 1799 1452 "osd_op_reply"); 1800 1453 if (err < 0) 1801 1454 goto out_msgpool; 1455 + 1456 + osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 1457 + if (IS_ERR(osdc->notify_wq)) { 1458 + err = PTR_ERR(osdc->notify_wq); 1459 + osdc->notify_wq = NULL; 1460 + goto out_msgpool; 1461 + } 1802 1462 return 0; 1803 1463 1804 1464 out_msgpool: ··· 1819 1465 1820 1466 void ceph_osdc_stop(struct ceph_osd_client *osdc) 1821 1467 { 1468 + flush_workqueue(osdc->notify_wq); 1469 + destroy_workqueue(osdc->notify_wq); 1822 1470 cancel_delayed_work_sync(&osdc->timeout_work); 1823 1471 cancel_delayed_work_sync(&osdc->osds_timeout_work); 1824 1472 if (osdc->osdmap) { ··· 1828 1472 osdc->osdmap = NULL; 1829 1473 } 1830 1474 remove_old_osds(osdc, 1); 1475 + WARN_ON(!RB_EMPTY_ROOT(&osdc->osds)); 1831 1476 mempool_destroy(osdc->req_mempool); 1832 1477 ceph_msgpool_destroy(&osdc->msgpool_op); 1833 1478 ceph_msgpool_destroy(&osdc->msgpool_op_reply); ··· 1937 1580 case CEPH_MSG_OSD_OPREPLY: 1938 1581 handle_reply(osdc, msg, con); 1939 1582 break; 1583 + case CEPH_MSG_WATCH_NOTIFY: 1584 + handle_watch_notify(osdc, msg); 1585 + break; 1940 1586 1941 1587 default: 1942 1588 pr_err("received unknown message type %d %s\n", type, ··· 2033 1673 2034 1674 switch (type) { 2035 1675 case CEPH_MSG_OSD_MAP: 1676 + case CEPH_MSG_WATCH_NOTIFY: 2036 1677 return ceph_msg_new(type, front, GFP_NOFS); 2037 1678 case CEPH_MSG_OSD_OPREPLY: 2038 1679 return get_reply(con, hdr, skip);