8e6a76cf18054f65e8a9f6be0a7b188a838e935b
[powerpc.git] / fs / dlm / lowcomms-tcp.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2006 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 /*
15  * lowcomms.c
16  *
17  * This is the "low-level" comms layer.
18  *
19  * It is responsible for sending/receiving messages
20  * from other nodes in the cluster.
21  *
22  * Cluster nodes are referred to by their nodeids. nodeids are
23  * simply 32 bit numbers to the locking module - if they need to
24  * be expanded for the cluster infrastructure then that is it's
25  * responsibility. It is this layer's
26  * responsibility to resolve these into IP address or
27  * whatever it needs for inter-node communication.
28  *
29  * The comms level is two kernel threads that deal mainly with
30  * the receiving of messages from other nodes and passing them
31  * up to the mid-level comms layer (which understands the
32  * message format) for execution by the locking core, and
33  * a send thread which does all the setting up of connections
34  * to remote nodes and the sending of data. Threads are not allowed
35  * to send their own data because it may cause them to wait in times
36  * of high load. Also, this way, the sending thread can collect together
37  * messages bound for one node and send them in one block.
38  *
39  * I don't see any problem with the recv thread executing the locking
40  * code on behalf of remote processes as the locking code is
41  * short, efficient and never waits.
42  *
43  */
44
45
46 #include <asm/ioctls.h>
47 #include <net/sock.h>
48 #include <net/tcp.h>
49 #include <linux/pagemap.h>
50
51 #include "dlm_internal.h"
52 #include "lowcomms.h"
53 #include "midcomms.h"
54 #include "config.h"
55
56 struct cbuf {
57         unsigned int base;
58         unsigned int len;
59         unsigned int mask;
60 };
61
62 #define NODE_INCREMENT 32
63 static void cbuf_add(struct cbuf *cb, int n)
64 {
65         cb->len += n;
66 }
67
68 static int cbuf_data(struct cbuf *cb)
69 {
70         return ((cb->base + cb->len) & cb->mask);
71 }
72
73 static void cbuf_init(struct cbuf *cb, int size)
74 {
75         cb->base = cb->len = 0;
76         cb->mask = size-1;
77 }
78
79 static void cbuf_eat(struct cbuf *cb, int n)
80 {
81         cb->len  -= n;
82         cb->base += n;
83         cb->base &= cb->mask;
84 }
85
86 static bool cbuf_empty(struct cbuf *cb)
87 {
88         return cb->len == 0;
89 }
90
91 /* Maximum number of incoming messages to process before
92    doing a cond_resched()
93 */
94 #define MAX_RX_MSG_COUNT 25
95
96 struct connection {
97         struct socket *sock;    /* NULL if not connected */
98         uint32_t nodeid;        /* So we know who we are in the list */
99         struct rw_semaphore sock_sem; /* Stop connect races */
100         unsigned long flags;    /* bit 1,2 = We are on the read/write lists */
101 #define CF_READ_PENDING 1
102 #define CF_WRITE_PENDING 2
103 #define CF_CONNECT_PENDING 3
104 #define CF_IS_OTHERCON 4
105         struct list_head writequeue;  /* List of outgoing writequeue_entries */
106         struct list_head listenlist;  /* List of allocated listening sockets */
107         spinlock_t writequeue_lock;
108         int (*rx_action) (struct connection *); /* What to do when active */
109         struct page *rx_page;
110         struct cbuf cb;
111         int retries;
112         atomic_t waiting_requests;
113 #define MAX_CONNECT_RETRIES 3
114         struct connection *othercon;
115         struct work_struct rwork; /* Receive workqueue */
116         struct work_struct swork; /* Send workqueue */
117 };
118 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
119
120 /* An entry waiting to be sent */
121 struct writequeue_entry {
122         struct list_head list;
123         struct page *page;
124         int offset;
125         int len;
126         int end;
127         int users;
128         struct connection *con;
129 };
130
131 static struct sockaddr_storage dlm_local_addr;
132
133 /* Work queues */
134 static struct workqueue_struct *recv_workqueue;
135 static struct workqueue_struct *send_workqueue;
136
137 /* An array of pointers to connections, indexed by NODEID */
138 static struct connection **connections;
139 static DECLARE_MUTEX(connections_lock);
140 static struct kmem_cache *con_cache;
141 static int conn_array_size;
142
143 static void process_recv_sockets(struct work_struct *work);
144 static void process_send_sockets(struct work_struct *work);
145
146 static struct connection *nodeid2con(int nodeid, gfp_t allocation)
147 {
148         struct connection *con = NULL;
149
150         down(&connections_lock);
151         if (nodeid >= conn_array_size) {
152                 int new_size = nodeid + NODE_INCREMENT;
153                 struct connection **new_conns;
154
155                 new_conns = kzalloc(sizeof(struct connection *) *
156                                     new_size, allocation);
157                 if (!new_conns)
158                         goto finish;
159
160                 memcpy(new_conns, connections,  sizeof(struct connection *) * conn_array_size);
161                 conn_array_size = new_size;
162                 kfree(connections);
163                 connections = new_conns;
164
165         }
166
167         con = connections[nodeid];
168         if (con == NULL && allocation) {
169                 con = kmem_cache_zalloc(con_cache, allocation);
170                 if (!con)
171                         goto finish;
172
173                 con->nodeid = nodeid;
174                 init_rwsem(&con->sock_sem);
175                 INIT_LIST_HEAD(&con->writequeue);
176                 spin_lock_init(&con->writequeue_lock);
177                 INIT_WORK(&con->swork, process_send_sockets);
178                 INIT_WORK(&con->rwork, process_recv_sockets);
179
180                 connections[nodeid] = con;
181         }
182
183 finish:
184         up(&connections_lock);
185         return con;
186 }
187
188 /* Data available on socket or listen socket received a connect */
189 static void lowcomms_data_ready(struct sock *sk, int count_unused)
190 {
191         struct connection *con = sock2con(sk);
192
193         if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
194                 queue_work(recv_workqueue, &con->rwork);
195 }
196
197 static void lowcomms_write_space(struct sock *sk)
198 {
199         struct connection *con = sock2con(sk);
200
201         if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
202                 queue_work(send_workqueue, &con->swork);
203 }
204
205 static inline void lowcomms_connect_sock(struct connection *con)
206 {
207         if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
208                 queue_work(send_workqueue, &con->swork);
209 }
210
211 static void lowcomms_state_change(struct sock *sk)
212 {
213         if (sk->sk_state == TCP_ESTABLISHED)
214                 lowcomms_write_space(sk);
215 }
216
217 /* Make a socket active */
218 static int add_sock(struct socket *sock, struct connection *con)
219 {
220         con->sock = sock;
221
222         /* Install a data_ready callback */
223         con->sock->sk->sk_data_ready = lowcomms_data_ready;
224         con->sock->sk->sk_write_space = lowcomms_write_space;
225         con->sock->sk->sk_state_change = lowcomms_state_change;
226
227         return 0;
228 }
229
230 /* Add the port number to an IP6 or 4 sockaddr and return the address
231    length */
232 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
233                           int *addr_len)
234 {
235         saddr->ss_family =  dlm_local_addr.ss_family;
236         if (saddr->ss_family == AF_INET) {
237                 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
238                 in4_addr->sin_port = cpu_to_be16(port);
239                 *addr_len = sizeof(struct sockaddr_in);
240         } else {
241                 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
242                 in6_addr->sin6_port = cpu_to_be16(port);
243                 *addr_len = sizeof(struct sockaddr_in6);
244         }
245 }
246
247 /* Close a remote connection and tidy up */
248 static void close_connection(struct connection *con, bool and_other)
249 {
250         down_write(&con->sock_sem);
251
252         if (con->sock) {
253                 sock_release(con->sock);
254                 con->sock = NULL;
255         }
256         if (con->othercon && and_other) {
257                 /* Will only re-enter once. */
258                 close_connection(con->othercon, false);
259         }
260         if (con->rx_page) {
261                 __free_page(con->rx_page);
262                 con->rx_page = NULL;
263         }
264         con->retries = 0;
265         up_write(&con->sock_sem);
266 }
267
268 /* Data received from remote end */
269 static int receive_from_sock(struct connection *con)
270 {
271         int ret = 0;
272         struct msghdr msg;
273         struct iovec iov[2];
274         mm_segment_t fs;
275         unsigned len;
276         int r;
277         int call_again_soon = 0;
278
279         down_read(&con->sock_sem);
280
281         if (con->sock == NULL)
282                 goto out;
283         if (con->rx_page == NULL) {
284                 /*
285                  * This doesn't need to be atomic, but I think it should
286                  * improve performance if it is.
287                  */
288                 con->rx_page = alloc_page(GFP_ATOMIC);
289                 if (con->rx_page == NULL)
290                         goto out_resched;
291                 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
292         }
293
294         msg.msg_control = NULL;
295         msg.msg_controllen = 0;
296         msg.msg_iovlen = 1;
297         msg.msg_iov = iov;
298         msg.msg_name = NULL;
299         msg.msg_namelen = 0;
300         msg.msg_flags = 0;
301
302         /*
303          * iov[0] is the bit of the circular buffer between the current end
304          * point (cb.base + cb.len) and the end of the buffer.
305          */
306         iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
307         iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
308         iov[1].iov_len = 0;
309
310         /*
311          * iov[1] is the bit of the circular buffer between the start of the
312          * buffer and the start of the currently used section (cb.base)
313          */
314         if (cbuf_data(&con->cb) >= con->cb.base) {
315                 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
316                 iov[1].iov_len = con->cb.base;
317                 iov[1].iov_base = page_address(con->rx_page);
318                 msg.msg_iovlen = 2;
319         }
320         len = iov[0].iov_len + iov[1].iov_len;
321
322         fs = get_fs();
323         set_fs(get_ds());
324         r = ret = sock_recvmsg(con->sock, &msg, len,
325                                MSG_DONTWAIT | MSG_NOSIGNAL);
326         set_fs(fs);
327
328         if (ret <= 0)
329                 goto out_close;
330         if (ret == -EAGAIN)
331                 goto out_resched;
332
333         if (ret == len)
334                 call_again_soon = 1;
335         cbuf_add(&con->cb, ret);
336         ret = dlm_process_incoming_buffer(con->nodeid,
337                                           page_address(con->rx_page),
338                                           con->cb.base, con->cb.len,
339                                           PAGE_CACHE_SIZE);
340         if (ret == -EBADMSG) {
341                 printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, "
342                        "iov_len=%u, iov_base[0]=%p, read=%d\n",
343                        page_address(con->rx_page), con->cb.base, con->cb.len,
344                        len, iov[0].iov_base, r);
345         }
346         if (ret < 0)
347                 goto out_close;
348         cbuf_eat(&con->cb, ret);
349
350         if (cbuf_empty(&con->cb) && !call_again_soon) {
351                 __free_page(con->rx_page);
352                 con->rx_page = NULL;
353         }
354
355 out:
356         if (call_again_soon)
357                 goto out_resched;
358         up_read(&con->sock_sem);
359         return 0;
360
361 out_resched:
362         if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
363                 queue_work(recv_workqueue, &con->rwork);
364         up_read(&con->sock_sem);
365         return -EAGAIN;
366
367 out_close:
368         up_read(&con->sock_sem);
369         if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
370                 close_connection(con, false);
371                 /* Reconnect when there is something to send */
372         }
373
374         return ret;
375 }
376
377 /* Listening socket is busy, accept a connection */
378 static int accept_from_sock(struct connection *con)
379 {
380         int result;
381         struct sockaddr_storage peeraddr;
382         struct socket *newsock;
383         int len;
384         int nodeid;
385         struct connection *newcon;
386         struct connection *addcon;
387
388         memset(&peeraddr, 0, sizeof(peeraddr));
389         result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
390                                   IPPROTO_TCP, &newsock);
391         if (result < 0)
392                 return -ENOMEM;
393
394         down_read_nested(&con->sock_sem, 0);
395
396         result = -ENOTCONN;
397         if (con->sock == NULL)
398                 goto accept_err;
399
400         newsock->type = con->sock->type;
401         newsock->ops = con->sock->ops;
402
403         result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
404         if (result < 0)
405                 goto accept_err;
406
407         /* Get the connected socket's peer */
408         memset(&peeraddr, 0, sizeof(peeraddr));
409         if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
410                                   &len, 2)) {
411                 result = -ECONNABORTED;
412                 goto accept_err;
413         }
414
415         /* Get the new node's NODEID */
416         make_sockaddr(&peeraddr, 0, &len);
417         if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
418                 printk("dlm: connect from non cluster node\n");
419                 sock_release(newsock);
420                 up_read(&con->sock_sem);
421                 return -1;
422         }
423
424         log_print("got connection from %d", nodeid);
425
426         /*  Check to see if we already have a connection to this node. This
427          *  could happen if the two nodes initiate a connection at roughly
428          *  the same time and the connections cross on the wire.
429          * TEMPORARY FIX:
430          *  In this case we store the incoming one in "othercon"
431          */
432         newcon = nodeid2con(nodeid, GFP_KERNEL);
433         if (!newcon) {
434                 result = -ENOMEM;
435                 goto accept_err;
436         }
437         down_write_nested(&newcon->sock_sem, 1);
438         if (newcon->sock) {
439                 struct connection *othercon = newcon->othercon;
440
441                 if (!othercon) {
442                         othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
443                         if (!othercon) {
444                                 printk("dlm: failed to allocate incoming socket\n");
445                                 up_write(&newcon->sock_sem);
446                                 result = -ENOMEM;
447                                 goto accept_err;
448                         }
449                         othercon->nodeid = nodeid;
450                         othercon->rx_action = receive_from_sock;
451                         init_rwsem(&othercon->sock_sem);
452                         INIT_WORK(&othercon->swork, process_send_sockets);
453                         INIT_WORK(&othercon->rwork, process_recv_sockets);
454                         set_bit(CF_IS_OTHERCON, &othercon->flags);
455                         newcon->othercon = othercon;
456                 }
457                 othercon->sock = newsock;
458                 newsock->sk->sk_user_data = othercon;
459                 add_sock(newsock, othercon);
460                 addcon = othercon;
461         }
462         else {
463                 newsock->sk->sk_user_data = newcon;
464                 newcon->rx_action = receive_from_sock;
465                 add_sock(newsock, newcon);
466                 addcon = newcon;
467         }
468
469         up_write(&newcon->sock_sem);
470
471         /*
472          * Add it to the active queue in case we got data
473          * beween processing the accept adding the socket
474          * to the read_sockets list
475          */
476         if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
477                 queue_work(recv_workqueue, &addcon->rwork);
478         up_read(&con->sock_sem);
479
480         return 0;
481
482 accept_err:
483         up_read(&con->sock_sem);
484         sock_release(newsock);
485
486         if (result != -EAGAIN)
487                 printk("dlm: error accepting connection from node: %d\n", result);
488         return result;
489 }
490
491 /* Connect a new socket to its peer */
492 static void connect_to_sock(struct connection *con)
493 {
494         int result = -EHOSTUNREACH;
495         struct sockaddr_storage saddr;
496         int addr_len;
497         struct socket *sock;
498
499         if (con->nodeid == 0) {
500                 log_print("attempt to connect sock 0 foiled");
501                 return;
502         }
503
504         down_write(&con->sock_sem);
505         if (con->retries++ > MAX_CONNECT_RETRIES)
506                 goto out;
507
508         /* Some odd races can cause double-connects, ignore them */
509         if (con->sock) {
510                 result = 0;
511                 goto out;
512         }
513
514         /* Create a socket to communicate with */
515         result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
516                                   IPPROTO_TCP, &sock);
517         if (result < 0)
518                 goto out_err;
519
520         memset(&saddr, 0, sizeof(saddr));
521         if (dlm_nodeid_to_addr(con->nodeid, &saddr))
522                 goto out_err;
523
524         sock->sk->sk_user_data = con;
525         con->rx_action = receive_from_sock;
526
527         make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
528
529         add_sock(sock, con);
530
531         log_print("connecting to %d", con->nodeid);
532         result =
533                 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
534                                    O_NONBLOCK);
535         if (result == -EINPROGRESS)
536                 result = 0;
537         if (result == 0)
538                 goto out;
539
540 out_err:
541         if (con->sock) {
542                 sock_release(con->sock);
543                 con->sock = NULL;
544         }
545         /*
546          * Some errors are fatal and this list might need adjusting. For other
547          * errors we try again until the max number of retries is reached.
548          */
549         if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
550             result != -ENETDOWN && result != EINVAL
551             && result != -EPROTONOSUPPORT) {
552                 lowcomms_connect_sock(con);
553                 result = 0;
554         }
555 out:
556         up_write(&con->sock_sem);
557         return;
558 }
559
560 static struct socket *create_listen_sock(struct connection *con,
561                                          struct sockaddr_storage *saddr)
562 {
563         struct socket *sock = NULL;
564         mm_segment_t fs;
565         int result = 0;
566         int one = 1;
567         int addr_len;
568
569         if (dlm_local_addr.ss_family == AF_INET)
570                 addr_len = sizeof(struct sockaddr_in);
571         else
572                 addr_len = sizeof(struct sockaddr_in6);
573
574         /* Create a socket to communicate with */
575         result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
576         if (result < 0) {
577                 printk("dlm: Can't create listening comms socket\n");
578                 goto create_out;
579         }
580
581         fs = get_fs();
582         set_fs(get_ds());
583         result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
584                                  (char *)&one, sizeof(one));
585         set_fs(fs);
586         if (result < 0) {
587                 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",
588                        result);
589         }
590         sock->sk->sk_user_data = con;
591         con->rx_action = accept_from_sock;
592         con->sock = sock;
593
594         /* Bind to our port */
595         make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
596         result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
597         if (result < 0) {
598                 printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port);
599                 sock_release(sock);
600                 sock = NULL;
601                 con->sock = NULL;
602                 goto create_out;
603         }
604
605         fs = get_fs();
606         set_fs(get_ds());
607
608         result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
609                                  (char *)&one, sizeof(one));
610         set_fs(fs);
611         if (result < 0) {
612                 printk("dlm: Set keepalive failed: %d\n", result);
613         }
614
615         result = sock->ops->listen(sock, 5);
616         if (result < 0) {
617                 printk("dlm: Can't listen on port %d\n", dlm_config.ci_tcp_port);
618                 sock_release(sock);
619                 sock = NULL;
620                 goto create_out;
621         }
622
623 create_out:
624         return sock;
625 }
626
627
628 /* Listen on all interfaces */
629 static int listen_for_all(void)
630 {
631         struct socket *sock = NULL;
632         struct connection *con = nodeid2con(0, GFP_KERNEL);
633         int result = -EINVAL;
634
635         /* We don't support multi-homed hosts */
636         set_bit(CF_IS_OTHERCON, &con->flags);
637
638         sock = create_listen_sock(con, &dlm_local_addr);
639         if (sock) {
640                 add_sock(sock, con);
641                 result = 0;
642         }
643         else {
644                 result = -EADDRINUSE;
645         }
646
647         return result;
648 }
649
650
651
652 static struct writequeue_entry *new_writequeue_entry(struct connection *con,
653                                                      gfp_t allocation)
654 {
655         struct writequeue_entry *entry;
656
657         entry = kmalloc(sizeof(struct writequeue_entry), allocation);
658         if (!entry)
659                 return NULL;
660
661         entry->page = alloc_page(allocation);
662         if (!entry->page) {
663                 kfree(entry);
664                 return NULL;
665         }
666
667         entry->offset = 0;
668         entry->len = 0;
669         entry->end = 0;
670         entry->users = 0;
671         entry->con = con;
672
673         return entry;
674 }
675
676 void *dlm_lowcomms_get_buffer(int nodeid, int len,
677                               gfp_t allocation, char **ppc)
678 {
679         struct connection *con;
680         struct writequeue_entry *e;
681         int offset = 0;
682         int users = 0;
683
684         con = nodeid2con(nodeid, allocation);
685         if (!con)
686                 return NULL;
687
688         spin_lock(&con->writequeue_lock);
689         e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
690         if ((&e->list == &con->writequeue) ||
691             (PAGE_CACHE_SIZE - e->end < len)) {
692                 e = NULL;
693         } else {
694                 offset = e->end;
695                 e->end += len;
696                 users = e->users++;
697         }
698         spin_unlock(&con->writequeue_lock);
699
700         if (e) {
701         got_one:
702                 if (users == 0)
703                         kmap(e->page);
704                 *ppc = page_address(e->page) + offset;
705                 return e;
706         }
707
708         e = new_writequeue_entry(con, allocation);
709         if (e) {
710                 spin_lock(&con->writequeue_lock);
711                 offset = e->end;
712                 e->end += len;
713                 users = e->users++;
714                 list_add_tail(&e->list, &con->writequeue);
715                 spin_unlock(&con->writequeue_lock);
716                 goto got_one;
717         }
718         return NULL;
719 }
720
721 void dlm_lowcomms_commit_buffer(void *mh)
722 {
723         struct writequeue_entry *e = (struct writequeue_entry *)mh;
724         struct connection *con = e->con;
725         int users;
726
727         spin_lock(&con->writequeue_lock);
728         users = --e->users;
729         if (users)
730                 goto out;
731         e->len = e->end - e->offset;
732         kunmap(e->page);
733         spin_unlock(&con->writequeue_lock);
734
735         if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
736                 queue_work(send_workqueue, &con->swork);
737         }
738         return;
739
740 out:
741         spin_unlock(&con->writequeue_lock);
742         return;
743 }
744
745 static void free_entry(struct writequeue_entry *e)
746 {
747         __free_page(e->page);
748         kfree(e);
749 }
750
751 /* Send a message */
752 static void send_to_sock(struct connection *con)
753 {
754         int ret = 0;
755         ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
756         const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
757         struct writequeue_entry *e;
758         int len, offset;
759
760         down_read(&con->sock_sem);
761         if (con->sock == NULL)
762                 goto out_connect;
763
764         sendpage = con->sock->ops->sendpage;
765
766         spin_lock(&con->writequeue_lock);
767         for (;;) {
768                 e = list_entry(con->writequeue.next, struct writequeue_entry,
769                                list);
770                 if ((struct list_head *) e == &con->writequeue)
771                         break;
772
773                 len = e->len;
774                 offset = e->offset;
775                 BUG_ON(len == 0 && e->users == 0);
776                 spin_unlock(&con->writequeue_lock);
777                 kmap(e->page);
778
779                 ret = 0;
780                 if (len) {
781                         ret = sendpage(con->sock, e->page, offset, len,
782                                        msg_flags);
783                         if (ret == -EAGAIN || ret == 0)
784                                 goto out;
785                         if (ret <= 0)
786                                 goto send_error;
787                 }
788                 else {
789                         /* Don't starve people filling buffers */
790                         cond_resched();
791                 }
792
793                 spin_lock(&con->writequeue_lock);
794                 e->offset += ret;
795                 e->len -= ret;
796
797                 if (e->len == 0 && e->users == 0) {
798                         list_del(&e->list);
799                         kunmap(e->page);
800                         free_entry(e);
801                         continue;
802                 }
803         }
804         spin_unlock(&con->writequeue_lock);
805 out:
806         up_read(&con->sock_sem);
807         return;
808
809 send_error:
810         up_read(&con->sock_sem);
811         close_connection(con, false);
812         lowcomms_connect_sock(con);
813         return;
814
815 out_connect:
816         up_read(&con->sock_sem);
817         connect_to_sock(con);
818         return;
819 }
820
821 static void clean_one_writequeue(struct connection *con)
822 {
823         struct list_head *list;
824         struct list_head *temp;
825
826         spin_lock(&con->writequeue_lock);
827         list_for_each_safe(list, temp, &con->writequeue) {
828                 struct writequeue_entry *e =
829                         list_entry(list, struct writequeue_entry, list);
830                 list_del(&e->list);
831                 free_entry(e);
832         }
833         spin_unlock(&con->writequeue_lock);
834 }
835
836 /* Called from recovery when it knows that a node has
837    left the cluster */
838 int dlm_lowcomms_close(int nodeid)
839 {
840         struct connection *con;
841
842         if (!connections)
843                 goto out;
844
845         log_print("closing connection to node %d", nodeid);
846         con = nodeid2con(nodeid, 0);
847         if (con) {
848                 clean_one_writequeue(con);
849                 close_connection(con, true);
850                 atomic_set(&con->waiting_requests, 0);
851         }
852         return 0;
853
854 out:
855         return -1;
856 }
857
858 /* Look for activity on active sockets */
859 static void process_recv_sockets(struct work_struct *work)
860 {
861         struct connection *con = container_of(work, struct connection, rwork);
862         int err;
863
864         clear_bit(CF_READ_PENDING, &con->flags);
865         do {
866                 err = con->rx_action(con);
867         } while (!err);
868 }
869
870
871 static void process_send_sockets(struct work_struct *work)
872 {
873         struct connection *con = container_of(work, struct connection, swork);
874
875         if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
876                 connect_to_sock(con);
877         }
878
879         clear_bit(CF_WRITE_PENDING, &con->flags);
880         send_to_sock(con);
881 }
882
883
884 /* Discard all entries on the write queues */
885 static void clean_writequeues(void)
886 {
887         int nodeid;
888
889         for (nodeid = 1; nodeid < conn_array_size; nodeid++) {
890                 struct connection *con = nodeid2con(nodeid, 0);
891
892                 if (con)
893                         clean_one_writequeue(con);
894         }
895 }
896
897 static void work_stop(void)
898 {
899         destroy_workqueue(recv_workqueue);
900         destroy_workqueue(send_workqueue);
901 }
902
903 static int work_start(void)
904 {
905         int error;
906         recv_workqueue = create_workqueue("dlm_recv");
907         error = IS_ERR(recv_workqueue);
908         if (error) {
909                 log_print("can't start dlm_recv %d", error);
910                 return error;
911         }
912
913         send_workqueue = create_singlethread_workqueue("dlm_send");
914         error = IS_ERR(send_workqueue);
915         if (error) {
916                 log_print("can't start dlm_send %d", error);
917                 destroy_workqueue(recv_workqueue);
918                 return error;
919         }
920
921         return 0;
922 }
923
924 void dlm_lowcomms_stop(void)
925 {
926         int i;
927
928         /* Set all the flags to prevent any
929            socket activity.
930         */
931         for (i = 0; i < conn_array_size; i++) {
932                 if (connections[i])
933                         connections[i]->flags |= 0xFF;
934         }
935
936         work_stop();
937         clean_writequeues();
938
939         for (i = 0; i < conn_array_size; i++) {
940                 if (connections[i]) {
941                         close_connection(connections[i], true);
942                         if (connections[i]->othercon)
943                                 kmem_cache_free(con_cache, connections[i]->othercon);
944                         kmem_cache_free(con_cache, connections[i]);
945                 }
946         }
947
948         kfree(connections);
949         connections = NULL;
950
951         kmem_cache_destroy(con_cache);
952 }
953
954 /* This is quite likely to sleep... */
955 int dlm_lowcomms_start(void)
956 {
957         int error = 0;
958
959         error = -ENOMEM;
960         connections = kzalloc(sizeof(struct connection *) *
961                               NODE_INCREMENT, GFP_KERNEL);
962         if (!connections)
963                 goto out;
964
965         conn_array_size = NODE_INCREMENT;
966
967         if (dlm_our_addr(&dlm_local_addr, 0)) {
968                 log_print("no local IP address has been set");
969                 goto fail_free_conn;
970         }
971         if (!dlm_our_addr(&dlm_local_addr, 1)) {
972                 log_print("This dlm comms module does not support multi-homed clustering");
973                 goto fail_free_conn;
974         }
975
976         con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
977                                       __alignof__(struct connection), 0,
978                                       NULL, NULL);
979         if (!con_cache)
980                 goto fail_free_conn;
981
982
983         /* Start listening */
984         error = listen_for_all();
985         if (error)
986                 goto fail_unlisten;
987
988         error = work_start();
989         if (error)
990                 goto fail_unlisten;
991
992         return 0;
993
994 fail_unlisten:
995         close_connection(connections[0], false);
996         kmem_cache_free(con_cache, connections[0]);
997         kmem_cache_destroy(con_cache);
998
999 fail_free_conn:
1000         kfree(connections);
1001
1002 out:
1003         return error;
1004 }
1005
1006 /*
1007  * Overrides for Emacs so that we follow Linus's tabbing style.
1008  * Emacs will notice this stuff at the end of the file and automatically
1009  * adjust the settings for this buffer only.  This must remain at the end
1010  * of the file.
1011  * ---------------------------------------------------------------------------
1012  * Local variables:
1013  * c-file-style: "linux"
1014  * End:
1015  */