net/ceph: make ceph_msgr_wq non-reentrant

ceph messenger code does a rather complex dancing around multithread
workqueue to make sure the same work item isn't executed concurrently
on different CPUs. This restriction can be provided by workqueue with
WQ_NON_REENTRANT.

Make ceph_msgr_wq non-reentrant workqueue with the default concurrency
level and remove the QUEUED/BUSY logic.

* This removes backoff handling in con_work() but it couldn't reliably
block execution of con_work() to begin with - queue_con() can be
called after the work started but before BUSY is set. It seems that
it was an optimization for a rather cold path and can be safely
removed.

* The number of concurrent work items is bound by the number of
connections and connetions are independent from each other. With
the default concurrency level, different connections will be
executed independently.

Signed-off-by: Tejun Heo <tj@kernel.org>
Cc: Sage Weil <sage@newdream.net>
Cc: ceph-devel@vger.kernel.org
Signed-off-by: Sage Weil <sage@newdream.net>

authored by Tejun Heo and committed by Sage Weil f363e45f 01e6acc4

+2 -49
-5
include/linux/ceph/messenger.h
··· 110 111 /* 112 * ceph_connection state bit flags 113 - * 114 - * QUEUED and BUSY are used together to ensure that only a single 115 - * thread is currently opening, reading or writing data to the socket. 116 */ 117 #define LOSSYTX 0 /* we can close channel or drop messages on errors */ 118 #define CONNECTING 1 119 #define NEGOTIATING 2 120 #define KEEPALIVE_PENDING 3 121 #define WRITE_PENDING 4 /* we have data ready to send */ 122 - #define QUEUED 5 /* there is work queued on this connection */ 123 - #define BUSY 6 /* work is being done */ 124 #define STANDBY 8 /* no outgoing messages, socket closed. we keep 125 * the ceph_connection around to maintain shared 126 * state with the peer. */
··· 110 111 /* 112 * ceph_connection state bit flags 113 */ 114 #define LOSSYTX 0 /* we can close channel or drop messages on errors */ 115 #define CONNECTING 1 116 #define NEGOTIATING 2 117 #define KEEPALIVE_PENDING 3 118 #define WRITE_PENDING 4 /* we have data ready to send */ 119 #define STANDBY 8 /* no outgoing messages, socket closed. we keep 120 * the ceph_connection around to maintain shared 121 * state with the peer. */
+2 -44
net/ceph/messenger.c
··· 96 97 int ceph_msgr_init(void) 98 { 99 - ceph_msgr_wq = create_workqueue("ceph-msgr"); 100 if (!ceph_msgr_wq) { 101 pr_err("msgr_init failed to create workqueue\n"); 102 return -ENOMEM; ··· 1920 /* 1921 * Atomically queue work on a connection. Bump @con reference to 1922 * avoid races with connection teardown. 1923 - * 1924 - * There is some trickery going on with QUEUED and BUSY because we 1925 - * only want a _single_ thread operating on each connection at any 1926 - * point in time, but we want to use all available CPUs. 1927 - * 1928 - * The worker thread only proceeds if it can atomically set BUSY. It 1929 - * clears QUEUED and does it's thing. When it thinks it's done, it 1930 - * clears BUSY, then rechecks QUEUED.. if it's set again, it loops 1931 - * (tries again to set BUSY). 1932 - * 1933 - * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we 1934 - * try to queue work. If that fails (work is already queued, or BUSY) 1935 - * we give up (work also already being done or is queued) but leave QUEUED 1936 - * set so that the worker thread will loop if necessary. 1937 */ 1938 static void queue_con(struct ceph_connection *con) 1939 { ··· 1934 return; 1935 } 1936 1937 - set_bit(QUEUED, &con->state); 1938 - if (test_bit(BUSY, &con->state)) { 1939 - dout("queue_con %p - already BUSY\n", con); 1940 - con->ops->put(con); 1941 - } else if (!queue_work(ceph_msgr_wq, &con->work.work)) { 1942 dout("queue_con %p - already queued\n", con); 1943 con->ops->put(con); 1944 } else { ··· 1949 { 1950 struct ceph_connection *con = container_of(work, struct ceph_connection, 1951 work.work); 1952 - int backoff = 0; 1953 - 1954 - more: 1955 - if (test_and_set_bit(BUSY, &con->state) != 0) { 1956 - dout("con_work %p BUSY already set\n", con); 1957 - goto out; 1958 - } 1959 - dout("con_work %p start, clearing QUEUED\n", con); 1960 - clear_bit(QUEUED, &con->state); 1961 1962 mutex_lock(&con->mutex); 1963 ··· 1967 try_read(con) < 0 || 1968 try_write(con) < 0) { 1969 mutex_unlock(&con->mutex); 1970 - backoff = 1; 1971 ceph_fault(con); /* error/fault path */ 1972 goto done_unlocked; 1973 } 1974 1975 done: 1976 mutex_unlock(&con->mutex); 1977 - 1978 done_unlocked: 1979 - clear_bit(BUSY, &con->state); 1980 - dout("con->state=%lu\n", con->state); 1981 - if (test_bit(QUEUED, &con->state)) { 1982 - if (!backoff || test_bit(OPENING, &con->state)) { 1983 - dout("con_work %p QUEUED reset, looping\n", con); 1984 - goto more; 1985 - } 1986 - dout("con_work %p QUEUED reset, but just faulted\n", con); 1987 - clear_bit(QUEUED, &con->state); 1988 - } 1989 - dout("con_work %p done\n", con); 1990 - 1991 - out: 1992 con->ops->put(con); 1993 } 1994
··· 96 97 int ceph_msgr_init(void) 98 { 99 + ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0); 100 if (!ceph_msgr_wq) { 101 pr_err("msgr_init failed to create workqueue\n"); 102 return -ENOMEM; ··· 1920 /* 1921 * Atomically queue work on a connection. Bump @con reference to 1922 * avoid races with connection teardown. 1923 */ 1924 static void queue_con(struct ceph_connection *con) 1925 { ··· 1948 return; 1949 } 1950 1951 + if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) { 1952 dout("queue_con %p - already queued\n", con); 1953 con->ops->put(con); 1954 } else { ··· 1967 { 1968 struct ceph_connection *con = container_of(work, struct ceph_connection, 1969 work.work); 1970 1971 mutex_lock(&con->mutex); 1972 ··· 1994 try_read(con) < 0 || 1995 try_write(con) < 0) { 1996 mutex_unlock(&con->mutex); 1997 ceph_fault(con); /* error/fault path */ 1998 goto done_unlocked; 1999 } 2000 2001 done: 2002 mutex_unlock(&con->mutex); 2003 done_unlocked: 2004 con->ops->put(con); 2005 } 2006