Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v3.6-rc4 401 lines 10 kB view raw
1/* 2 * r2net.c 3 * 4 * Copyright (c) 2011, Dan Magenheimer, Oracle Corp. 5 * 6 * Ramster_r2net provides an interface between zcache and r2net. 7 * 8 * FIXME: support more than two nodes 9 */ 10 11#include <linux/list.h> 12#include "cluster/tcp.h" 13#include "cluster/nodemanager.h" 14#include "tmem.h" 15#include "zcache.h" 16#include "ramster.h" 17 18#define RAMSTER_TESTING 19 20#define RMSTR_KEY 0x77347734 21 22enum { 23 RMSTR_TMEM_PUT_EPH = 100, 24 RMSTR_TMEM_PUT_PERS, 25 RMSTR_TMEM_ASYNC_GET_REQUEST, 26 RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST, 27 RMSTR_TMEM_ASYNC_GET_REPLY, 28 RMSTR_TMEM_FLUSH, 29 RMSTR_TMEM_FLOBJ, 30 RMSTR_TMEM_DESTROY_POOL, 31}; 32 33#define RMSTR_R2NET_MAX_LEN \ 34 (R2NET_MAX_PAYLOAD_BYTES - sizeof(struct tmem_xhandle)) 35 36#include "cluster/tcp_internal.h" 37 38static struct r2nm_node *r2net_target_node; 39static int r2net_target_nodenum; 40 41int r2net_remote_target_node_set(int node_num) 42{ 43 int ret = -1; 44 45 r2net_target_node = r2nm_get_node_by_num(node_num); 46 if (r2net_target_node != NULL) { 47 r2net_target_nodenum = node_num; 48 r2nm_node_put(r2net_target_node); 49 ret = 0; 50 } 51 return ret; 52} 53 54/* FIXME following buffer should be per-cpu, protected by preempt_disable */ 55static char ramster_async_get_buf[R2NET_MAX_PAYLOAD_BYTES]; 56 57static int ramster_remote_async_get_request_handler(struct r2net_msg *msg, 58 u32 len, void *data, void **ret_data) 59{ 60 char *pdata; 61 struct tmem_xhandle xh; 62 int found; 63 size_t size = RMSTR_R2NET_MAX_LEN; 64 u16 msgtype = be16_to_cpu(msg->msg_type); 65 bool get_and_free = (msgtype == RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST); 66 unsigned long flags; 67 68 xh = *(struct tmem_xhandle *)msg->buf; 69 if (xh.xh_data_size > RMSTR_R2NET_MAX_LEN) 70 BUG(); 71 pdata = ramster_async_get_buf; 72 *(struct tmem_xhandle *)pdata = xh; 73 pdata += sizeof(struct tmem_xhandle); 74 local_irq_save(flags); 75 found = zcache_get(xh.client_id, xh.pool_id, &xh.oid, xh.index, 76 pdata, &size, 1, get_and_free ? 1 : -1); 77 local_irq_restore(flags); 78 if (found < 0) { 79 /* a zero size indicates the get failed */ 80 size = 0; 81 } 82 if (size > RMSTR_R2NET_MAX_LEN) 83 BUG(); 84 *ret_data = pdata - sizeof(struct tmem_xhandle); 85 /* now make caller (r2net_process_message) handle specially */ 86 r2net_force_data_magic(msg, RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY); 87 return size + sizeof(struct tmem_xhandle); 88} 89 90static int ramster_remote_async_get_reply_handler(struct r2net_msg *msg, 91 u32 len, void *data, void **ret_data) 92{ 93 char *in = (char *)msg->buf; 94 int datalen = len - sizeof(struct r2net_msg); 95 int ret = -1; 96 struct tmem_xhandle *xh = (struct tmem_xhandle *)in; 97 98 in += sizeof(struct tmem_xhandle); 99 datalen -= sizeof(struct tmem_xhandle); 100 BUG_ON(datalen < 0 || datalen > PAGE_SIZE); 101 ret = zcache_localify(xh->pool_id, &xh->oid, xh->index, 102 in, datalen, xh->extra); 103#ifdef RAMSTER_TESTING 104 if (ret == -EEXIST) 105 pr_err("TESTING ArrgREP, aborted overwrite on racy put\n"); 106#endif 107 return ret; 108} 109 110int ramster_remote_put_handler(struct r2net_msg *msg, 111 u32 len, void *data, void **ret_data) 112{ 113 struct tmem_xhandle *xh; 114 char *p = (char *)msg->buf; 115 int datalen = len - sizeof(struct r2net_msg) - 116 sizeof(struct tmem_xhandle); 117 u16 msgtype = be16_to_cpu(msg->msg_type); 118 bool ephemeral = (msgtype == RMSTR_TMEM_PUT_EPH); 119 unsigned long flags; 120 int ret; 121 122 xh = (struct tmem_xhandle *)p; 123 p += sizeof(struct tmem_xhandle); 124 zcache_autocreate_pool(xh->client_id, xh->pool_id, ephemeral); 125 local_irq_save(flags); 126 ret = zcache_put(xh->client_id, xh->pool_id, &xh->oid, xh->index, 127 p, datalen, 1, ephemeral ? 1 : -1); 128 local_irq_restore(flags); 129 return ret; 130} 131 132int ramster_remote_flush_handler(struct r2net_msg *msg, 133 u32 len, void *data, void **ret_data) 134{ 135 struct tmem_xhandle *xh; 136 char *p = (char *)msg->buf; 137 138 xh = (struct tmem_xhandle *)p; 139 p += sizeof(struct tmem_xhandle); 140 (void)zcache_flush(xh->client_id, xh->pool_id, &xh->oid, xh->index); 141 return 0; 142} 143 144int ramster_remote_flobj_handler(struct r2net_msg *msg, 145 u32 len, void *data, void **ret_data) 146{ 147 struct tmem_xhandle *xh; 148 char *p = (char *)msg->buf; 149 150 xh = (struct tmem_xhandle *)p; 151 p += sizeof(struct tmem_xhandle); 152 (void)zcache_flush_object(xh->client_id, xh->pool_id, &xh->oid); 153 return 0; 154} 155 156int ramster_remote_async_get(struct tmem_xhandle *xh, bool free, int remotenode, 157 size_t expect_size, uint8_t expect_cksum, 158 void *extra) 159{ 160 int ret = -1, status; 161 struct r2nm_node *node = NULL; 162 struct kvec vec[1]; 163 size_t veclen = 1; 164 u32 msg_type; 165 166 node = r2nm_get_node_by_num(remotenode); 167 if (node == NULL) 168 goto out; 169 xh->client_id = r2nm_this_node(); /* which node is getting */ 170 xh->xh_data_cksum = expect_cksum; 171 xh->xh_data_size = expect_size; 172 xh->extra = extra; 173 vec[0].iov_len = sizeof(*xh); 174 vec[0].iov_base = xh; 175 if (free) 176 msg_type = RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST; 177 else 178 msg_type = RMSTR_TMEM_ASYNC_GET_REQUEST; 179 ret = r2net_send_message_vec(msg_type, RMSTR_KEY, 180 vec, veclen, remotenode, &status); 181 r2nm_node_put(node); 182 if (ret < 0) { 183 /* FIXME handle bad message possibilities here? */ 184 pr_err("UNTESTED ret<0 in ramster_remote_async_get\n"); 185 } 186 ret = status; 187out: 188 return ret; 189} 190 191#ifdef RAMSTER_TESTING 192/* leave me here to see if it catches a weird crash */ 193static void ramster_check_irq_counts(void) 194{ 195 static int last_hardirq_cnt, last_softirq_cnt, last_preempt_cnt; 196 int cur_hardirq_cnt, cur_softirq_cnt, cur_preempt_cnt; 197 198 cur_hardirq_cnt = hardirq_count() >> HARDIRQ_SHIFT; 199 if (cur_hardirq_cnt > last_hardirq_cnt) { 200 last_hardirq_cnt = cur_hardirq_cnt; 201 if (!(last_hardirq_cnt&(last_hardirq_cnt-1))) 202 pr_err("RAMSTER TESTING RRP hardirq_count=%d\n", 203 last_hardirq_cnt); 204 } 205 cur_softirq_cnt = softirq_count() >> SOFTIRQ_SHIFT; 206 if (cur_softirq_cnt > last_softirq_cnt) { 207 last_softirq_cnt = cur_softirq_cnt; 208 if (!(last_softirq_cnt&(last_softirq_cnt-1))) 209 pr_err("RAMSTER TESTING RRP softirq_count=%d\n", 210 last_softirq_cnt); 211 } 212 cur_preempt_cnt = preempt_count() & PREEMPT_MASK; 213 if (cur_preempt_cnt > last_preempt_cnt) { 214 last_preempt_cnt = cur_preempt_cnt; 215 if (!(last_preempt_cnt&(last_preempt_cnt-1))) 216 pr_err("RAMSTER TESTING RRP preempt_count=%d\n", 217 last_preempt_cnt); 218 } 219} 220#endif 221 222int ramster_remote_put(struct tmem_xhandle *xh, char *data, size_t size, 223 bool ephemeral, int *remotenode) 224{ 225 int nodenum, ret = -1, status; 226 struct r2nm_node *node = NULL; 227 struct kvec vec[2]; 228 size_t veclen = 2; 229 u32 msg_type; 230#ifdef RAMSTER_TESTING 231 struct r2net_node *nn; 232#endif 233 234 BUG_ON(size > RMSTR_R2NET_MAX_LEN); 235 xh->client_id = r2nm_this_node(); /* which node is putting */ 236 vec[0].iov_len = sizeof(*xh); 237 vec[0].iov_base = xh; 238 vec[1].iov_len = size; 239 vec[1].iov_base = data; 240 node = r2net_target_node; 241 if (!node) 242 goto out; 243 244 nodenum = r2net_target_nodenum; 245 246 r2nm_node_get(node); 247 248#ifdef RAMSTER_TESTING 249 nn = r2net_nn_from_num(nodenum); 250 WARN_ON_ONCE(nn->nn_persistent_error || !nn->nn_sc_valid); 251#endif 252 253 if (ephemeral) 254 msg_type = RMSTR_TMEM_PUT_EPH; 255 else 256 msg_type = RMSTR_TMEM_PUT_PERS; 257#ifdef RAMSTER_TESTING 258 /* leave me here to see if it catches a weird crash */ 259 ramster_check_irq_counts(); 260#endif 261 262 ret = r2net_send_message_vec(msg_type, RMSTR_KEY, vec, veclen, 263 nodenum, &status); 264#ifdef RAMSTER_TESTING 265 if (ret != 0) { 266 static unsigned long cnt; 267 cnt++; 268 if (!(cnt&(cnt-1))) 269 pr_err("ramster_remote_put: message failed, ret=%d, cnt=%lu\n", 270 ret, cnt); 271 ret = -1; 272 } 273#endif 274 if (ret < 0) 275 ret = -1; 276 else { 277 ret = status; 278 *remotenode = nodenum; 279 } 280 281 r2nm_node_put(node); 282out: 283 return ret; 284} 285 286int ramster_remote_flush(struct tmem_xhandle *xh, int remotenode) 287{ 288 int ret = -1, status; 289 struct r2nm_node *node = NULL; 290 struct kvec vec[1]; 291 size_t veclen = 1; 292 293 node = r2nm_get_node_by_num(remotenode); 294 BUG_ON(node == NULL); 295 xh->client_id = r2nm_this_node(); /* which node is flushing */ 296 vec[0].iov_len = sizeof(*xh); 297 vec[0].iov_base = xh; 298 BUG_ON(irqs_disabled()); 299 BUG_ON(in_softirq()); 300 ret = r2net_send_message_vec(RMSTR_TMEM_FLUSH, RMSTR_KEY, 301 vec, veclen, remotenode, &status); 302 r2nm_node_put(node); 303 return ret; 304} 305 306int ramster_remote_flush_object(struct tmem_xhandle *xh, int remotenode) 307{ 308 int ret = -1, status; 309 struct r2nm_node *node = NULL; 310 struct kvec vec[1]; 311 size_t veclen = 1; 312 313 node = r2nm_get_node_by_num(remotenode); 314 BUG_ON(node == NULL); 315 xh->client_id = r2nm_this_node(); /* which node is flobjing */ 316 vec[0].iov_len = sizeof(*xh); 317 vec[0].iov_base = xh; 318 ret = r2net_send_message_vec(RMSTR_TMEM_FLOBJ, RMSTR_KEY, 319 vec, veclen, remotenode, &status); 320 r2nm_node_put(node); 321 return ret; 322} 323 324/* 325 * Handler registration 326 */ 327 328static LIST_HEAD(r2net_unreg_list); 329 330static void r2net_unregister_handlers(void) 331{ 332 r2net_unregister_handler_list(&r2net_unreg_list); 333} 334 335int r2net_register_handlers(void) 336{ 337 int status; 338 339 status = r2net_register_handler(RMSTR_TMEM_PUT_EPH, RMSTR_KEY, 340 RMSTR_R2NET_MAX_LEN, 341 ramster_remote_put_handler, 342 NULL, NULL, &r2net_unreg_list); 343 if (status) 344 goto bail; 345 346 status = r2net_register_handler(RMSTR_TMEM_PUT_PERS, RMSTR_KEY, 347 RMSTR_R2NET_MAX_LEN, 348 ramster_remote_put_handler, 349 NULL, NULL, &r2net_unreg_list); 350 if (status) 351 goto bail; 352 353 status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REQUEST, RMSTR_KEY, 354 RMSTR_R2NET_MAX_LEN, 355 ramster_remote_async_get_request_handler, 356 NULL, NULL, 357 &r2net_unreg_list); 358 if (status) 359 goto bail; 360 361 status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST, 362 RMSTR_KEY, RMSTR_R2NET_MAX_LEN, 363 ramster_remote_async_get_request_handler, 364 NULL, NULL, 365 &r2net_unreg_list); 366 if (status) 367 goto bail; 368 369 status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY, 370 RMSTR_R2NET_MAX_LEN, 371 ramster_remote_async_get_reply_handler, 372 NULL, NULL, 373 &r2net_unreg_list); 374 if (status) 375 goto bail; 376 377 status = r2net_register_handler(RMSTR_TMEM_FLUSH, RMSTR_KEY, 378 RMSTR_R2NET_MAX_LEN, 379 ramster_remote_flush_handler, 380 NULL, NULL, 381 &r2net_unreg_list); 382 if (status) 383 goto bail; 384 385 status = r2net_register_handler(RMSTR_TMEM_FLOBJ, RMSTR_KEY, 386 RMSTR_R2NET_MAX_LEN, 387 ramster_remote_flobj_handler, 388 NULL, NULL, 389 &r2net_unreg_list); 390 if (status) 391 goto bail; 392 393 pr_info("ramster: r2net handlers registered\n"); 394 395bail: 396 if (status) { 397 r2net_unregister_handlers(); 398 pr_err("ramster: couldn't register r2net handlers\n"); 399 } 400 return status; 401}