ocfs2_dlm: fix cluster-wide refcounting of lock resources
[powerpc.git] / fs / ocfs2 / dlm / dlmdomain.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmdomain.c
5  *
6  * defines domain join / leave apis
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26
27 #include <linux/module.h>
28 #include <linux/types.h>
29 #include <linux/slab.h>
30 #include <linux/highmem.h>
31 #include <linux/utsname.h>
32 #include <linux/init.h>
33 #include <linux/spinlock.h>
34 #include <linux/delay.h>
35 #include <linux/err.h>
36
37 #include "cluster/heartbeat.h"
38 #include "cluster/nodemanager.h"
39 #include "cluster/tcp.h"
40
41 #include "dlmapi.h"
42 #include "dlmcommon.h"
43
44 #include "dlmdomain.h"
45
46 #include "dlmver.h"
47
48 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
49 #include "cluster/masklog.h"
50
51 static void dlm_free_pagevec(void **vec, int pages)
52 {
53         while (pages--)
54                 free_page((unsigned long)vec[pages]);
55         kfree(vec);
56 }
57
58 static void **dlm_alloc_pagevec(int pages)
59 {
60         void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
61         int i;
62
63         if (!vec)
64                 return NULL;
65
66         for (i = 0; i < pages; i++)
67                 if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
68                         goto out_free;
69
70         mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
71              pages, (unsigned long)DLM_HASH_PAGES,
72              (unsigned long)DLM_BUCKETS_PER_PAGE);
73         return vec;
74 out_free:
75         dlm_free_pagevec(vec, i);
76         return NULL;
77 }
78
79 /*
80  *
81  * spinlock lock ordering: if multiple locks are needed, obey this ordering:
82  *    dlm_domain_lock
83  *    struct dlm_ctxt->spinlock
84  *    struct dlm_lock_resource->spinlock
85  *    struct dlm_ctxt->master_lock
86  *    struct dlm_ctxt->ast_lock
87  *    dlm_master_list_entry->spinlock
88  *    dlm_lock->spinlock
89  *
90  */
91
92 DEFINE_SPINLOCK(dlm_domain_lock);
93 LIST_HEAD(dlm_domains);
94 static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
95
96 #define DLM_DOMAIN_BACKOFF_MS 200
97
98 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data);
99 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data);
100 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data);
101 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data);
102
103 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
104
105 void __dlm_unhash_lockres(struct dlm_lock_resource *lockres)
106 {
107         hlist_del_init(&lockres->hash_node);
108         dlm_lockres_put(lockres);
109 }
110
111 void __dlm_insert_lockres(struct dlm_ctxt *dlm,
112                        struct dlm_lock_resource *res)
113 {
114         struct hlist_head *bucket;
115         struct qstr *q;
116
117         assert_spin_locked(&dlm->spinlock);
118
119         q = &res->lockname;
120         bucket = dlm_lockres_hash(dlm, q->hash);
121
122         /* get a reference for our hashtable */
123         dlm_lockres_get(res);
124
125         hlist_add_head(&res->hash_node, bucket);
126 }
127
128 struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
129                                                      const char *name,
130                                                      unsigned int len,
131                                                      unsigned int hash)
132 {
133         struct hlist_head *bucket;
134         struct hlist_node *list;
135
136         mlog_entry("%.*s\n", len, name);
137
138         assert_spin_locked(&dlm->spinlock);
139
140         bucket = dlm_lockres_hash(dlm, hash);
141
142         hlist_for_each(list, bucket) {
143                 struct dlm_lock_resource *res = hlist_entry(list,
144                         struct dlm_lock_resource, hash_node);
145                 if (res->lockname.name[0] != name[0])
146                         continue;
147                 if (unlikely(res->lockname.len != len))
148                         continue;
149                 if (memcmp(res->lockname.name + 1, name + 1, len - 1))
150                         continue;
151                 dlm_lockres_get(res);
152                 return res;
153         }
154         return NULL;
155 }
156
157 /* intended to be called by functions which do not care about lock
158  * resources which are being purged (most net _handler functions).
159  * this will return NULL for any lock resource which is found but
160  * currently in the process of dropping its mastery reference.
161  * use __dlm_lookup_lockres_full when you need the lock resource
162  * regardless (e.g. dlm_get_lock_resource) */
163 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
164                                                 const char *name,
165                                                 unsigned int len,
166                                                 unsigned int hash)
167 {
168         struct dlm_lock_resource *res = NULL;
169
170         mlog_entry("%.*s\n", len, name);
171
172         assert_spin_locked(&dlm->spinlock);
173
174         res = __dlm_lookup_lockres_full(dlm, name, len, hash);
175         if (res) {
176                 spin_lock(&res->spinlock);
177                 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
178                         spin_unlock(&res->spinlock);
179                         dlm_lockres_put(res);
180                         return NULL;
181                 }
182                 spin_unlock(&res->spinlock);
183         }
184
185         return res;
186 }
187
188 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
189                                     const char *name,
190                                     unsigned int len)
191 {
192         struct dlm_lock_resource *res;
193         unsigned int hash = dlm_lockid_hash(name, len);
194
195         spin_lock(&dlm->spinlock);
196         res = __dlm_lookup_lockres(dlm, name, len, hash);
197         spin_unlock(&dlm->spinlock);
198         return res;
199 }
200
201 static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
202 {
203         struct dlm_ctxt *tmp = NULL;
204         struct list_head *iter;
205
206         assert_spin_locked(&dlm_domain_lock);
207
208         /* tmp->name here is always NULL terminated,
209          * but domain may not be! */
210         list_for_each(iter, &dlm_domains) {
211                 tmp = list_entry (iter, struct dlm_ctxt, list);
212                 if (strlen(tmp->name) == len &&
213                     memcmp(tmp->name, domain, len)==0)
214                         break;
215                 tmp = NULL;
216         }
217
218         return tmp;
219 }
220
221 /* For null terminated domain strings ONLY */
222 static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
223 {
224         assert_spin_locked(&dlm_domain_lock);
225
226         return __dlm_lookup_domain_full(domain, strlen(domain));
227 }
228
229
230 /* returns true on one of two conditions:
231  * 1) the domain does not exist
232  * 2) the domain exists and it's state is "joined" */
233 static int dlm_wait_on_domain_helper(const char *domain)
234 {
235         int ret = 0;
236         struct dlm_ctxt *tmp = NULL;
237
238         spin_lock(&dlm_domain_lock);
239
240         tmp = __dlm_lookup_domain(domain);
241         if (!tmp)
242                 ret = 1;
243         else if (tmp->dlm_state == DLM_CTXT_JOINED)
244                 ret = 1;
245
246         spin_unlock(&dlm_domain_lock);
247         return ret;
248 }
249
250 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
251 {
252         if (dlm->lockres_hash)
253                 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
254
255         if (dlm->name)
256                 kfree(dlm->name);
257
258         kfree(dlm);
259 }
260
261 /* A little strange - this function will be called while holding
262  * dlm_domain_lock and is expected to be holding it on the way out. We
263  * will however drop and reacquire it multiple times */
264 static void dlm_ctxt_release(struct kref *kref)
265 {
266         struct dlm_ctxt *dlm;
267
268         dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
269
270         BUG_ON(dlm->num_joins);
271         BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
272
273         /* we may still be in the list if we hit an error during join. */
274         list_del_init(&dlm->list);
275
276         spin_unlock(&dlm_domain_lock);
277
278         mlog(0, "freeing memory from domain %s\n", dlm->name);
279
280         wake_up(&dlm_domain_events);
281
282         dlm_free_ctxt_mem(dlm);
283
284         spin_lock(&dlm_domain_lock);
285 }
286
287 void dlm_put(struct dlm_ctxt *dlm)
288 {
289         spin_lock(&dlm_domain_lock);
290         kref_put(&dlm->dlm_refs, dlm_ctxt_release);
291         spin_unlock(&dlm_domain_lock);
292 }
293
294 static void __dlm_get(struct dlm_ctxt *dlm)
295 {
296         kref_get(&dlm->dlm_refs);
297 }
298
299 /* given a questionable reference to a dlm object, gets a reference if
300  * it can find it in the list, otherwise returns NULL in which case
301  * you shouldn't trust your pointer. */
302 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
303 {
304         struct list_head *iter;
305         struct dlm_ctxt *target = NULL;
306
307         spin_lock(&dlm_domain_lock);
308
309         list_for_each(iter, &dlm_domains) {
310                 target = list_entry (iter, struct dlm_ctxt, list);
311
312                 if (target == dlm) {
313                         __dlm_get(target);
314                         break;
315                 }
316
317                 target = NULL;
318         }
319
320         spin_unlock(&dlm_domain_lock);
321
322         return target;
323 }
324
325 int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
326 {
327         int ret;
328
329         spin_lock(&dlm_domain_lock);
330         ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
331                 (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
332         spin_unlock(&dlm_domain_lock);
333
334         return ret;
335 }
336
337 static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm)
338 {
339         if (dlm->dlm_worker) {
340                 flush_workqueue(dlm->dlm_worker);
341                 destroy_workqueue(dlm->dlm_worker);
342                 dlm->dlm_worker = NULL;
343         }
344 }
345
346 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
347 {
348         dlm_unregister_domain_handlers(dlm);
349         dlm_complete_thread(dlm);
350         dlm_complete_recovery_thread(dlm);
351         dlm_destroy_dlm_worker(dlm);
352
353         /* We've left the domain. Now we can take ourselves out of the
354          * list and allow the kref stuff to help us free the
355          * memory. */
356         spin_lock(&dlm_domain_lock);
357         list_del_init(&dlm->list);
358         spin_unlock(&dlm_domain_lock);
359
360         /* Wake up anyone waiting for us to remove this domain */
361         wake_up(&dlm_domain_events);
362 }
363
364 static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
365 {
366         int i, num, n, ret = 0;
367         struct dlm_lock_resource *res;
368         struct hlist_node *iter;
369         struct hlist_head *bucket;
370         int dropped;
371
372         mlog(0, "Migrating locks from domain %s\n", dlm->name);
373
374         num = 0;
375         spin_lock(&dlm->spinlock);
376         for (i = 0; i < DLM_HASH_BUCKETS; i++) {
377 redo_bucket:
378                 n = 0;
379                 bucket = dlm_lockres_hash(dlm, i);
380                 iter = bucket->first;
381                 while (iter) {
382                         n++;
383                         res = hlist_entry(iter, struct dlm_lock_resource,
384                                           hash_node);
385                         dlm_lockres_get(res);
386                         /* migrate, if necessary.  this will drop the dlm
387                          * spinlock and retake it if it does migration. */
388                         dropped = dlm_empty_lockres(dlm, res);
389
390                         spin_lock(&res->spinlock);
391                         __dlm_lockres_calc_usage(dlm, res);
392                         iter = res->hash_node.next;
393                         spin_unlock(&res->spinlock);
394
395                         dlm_lockres_put(res);
396
397                         cond_resched_lock(&dlm->spinlock);
398
399                         if (dropped)
400                                 goto redo_bucket;
401                 }
402                 num += n;
403                 mlog(0, "%s: touched %d lockreses in bucket %d "
404                      "(tot=%d)\n", dlm->name, n, i, num);
405         }
406         spin_unlock(&dlm->spinlock);
407         wake_up(&dlm->dlm_thread_wq);
408
409         /* let the dlm thread take care of purging, keep scanning until
410          * nothing remains in the hash */
411         if (num) {
412                 mlog(0, "%s: %d lock resources in hash last pass\n",
413                      dlm->name, num);
414                 ret = -EAGAIN;
415         }
416         mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
417         return ret;
418 }
419
420 static int dlm_no_joining_node(struct dlm_ctxt *dlm)
421 {
422         int ret;
423
424         spin_lock(&dlm->spinlock);
425         ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
426         spin_unlock(&dlm->spinlock);
427
428         return ret;
429 }
430
431 static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
432 {
433         /* Yikes, a double spinlock! I need domain_lock for the dlm
434          * state and the dlm spinlock for join state... Sorry! */
435 again:
436         spin_lock(&dlm_domain_lock);
437         spin_lock(&dlm->spinlock);
438
439         if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
440                 mlog(0, "Node %d is joining, we wait on it.\n",
441                           dlm->joining_node);
442                 spin_unlock(&dlm->spinlock);
443                 spin_unlock(&dlm_domain_lock);
444
445                 wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
446                 goto again;
447         }
448
449         dlm->dlm_state = DLM_CTXT_LEAVING;
450         spin_unlock(&dlm->spinlock);
451         spin_unlock(&dlm_domain_lock);
452 }
453
454 static void __dlm_print_nodes(struct dlm_ctxt *dlm)
455 {
456         int node = -1;
457
458         assert_spin_locked(&dlm->spinlock);
459
460         printk(KERN_INFO "ocfs2_dlm: Nodes in domain (\"%s\"): ", dlm->name);
461
462         while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
463                                      node + 1)) < O2NM_MAX_NODES) {
464                 printk("%d ", node);
465         }
466         printk("\n");
467 }
468
469 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data)
470 {
471         struct dlm_ctxt *dlm = data;
472         unsigned int node;
473         struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
474
475         mlog_entry("%p %u %p", msg, len, data);
476
477         if (!dlm_grab(dlm))
478                 return 0;
479
480         node = exit_msg->node_idx;
481
482         printk(KERN_INFO "ocfs2_dlm: Node %u leaves domain %s\n", node, dlm->name);
483
484         spin_lock(&dlm->spinlock);
485         clear_bit(node, dlm->domain_map);
486         __dlm_print_nodes(dlm);
487
488         /* notify anything attached to the heartbeat events */
489         dlm_hb_event_notify_attached(dlm, node, 0);
490
491         spin_unlock(&dlm->spinlock);
492
493         dlm_put(dlm);
494
495         return 0;
496 }
497
498 static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm,
499                                     unsigned int node)
500 {
501         int status;
502         struct dlm_exit_domain leave_msg;
503
504         mlog(0, "Asking node %u if we can leave the domain %s me = %u\n",
505                   node, dlm->name, dlm->node_num);
506
507         memset(&leave_msg, 0, sizeof(leave_msg));
508         leave_msg.node_idx = dlm->node_num;
509
510         status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key,
511                                     &leave_msg, sizeof(leave_msg), node,
512                                     NULL);
513
514         mlog(0, "status return %d from o2net_send_message\n", status);
515
516         return status;
517 }
518
519
520 static void dlm_leave_domain(struct dlm_ctxt *dlm)
521 {
522         int node, clear_node, status;
523
524         /* At this point we've migrated away all our locks and won't
525          * accept mastership of new ones. The dlm is responsible for
526          * almost nothing now. We make sure not to confuse any joining
527          * nodes and then commence shutdown procedure. */
528
529         spin_lock(&dlm->spinlock);
530         /* Clear ourselves from the domain map */
531         clear_bit(dlm->node_num, dlm->domain_map);
532         while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
533                                      0)) < O2NM_MAX_NODES) {
534                 /* Drop the dlm spinlock. This is safe wrt the domain_map.
535                  * -nodes cannot be added now as the
536                  *   query_join_handlers knows to respond with OK_NO_MAP
537                  * -we catch the right network errors if a node is
538                  *   removed from the map while we're sending him the
539                  *   exit message. */
540                 spin_unlock(&dlm->spinlock);
541
542                 clear_node = 1;
543
544                 status = dlm_send_one_domain_exit(dlm, node);
545                 if (status < 0 &&
546                     status != -ENOPROTOOPT &&
547                     status != -ENOTCONN) {
548                         mlog(ML_NOTICE, "Error %d sending domain exit message "
549                              "to node %d\n", status, node);
550
551                         /* Not sure what to do here but lets sleep for
552                          * a bit in case this was a transient
553                          * error... */
554                         msleep(DLM_DOMAIN_BACKOFF_MS);
555                         clear_node = 0;
556                 }
557
558                 spin_lock(&dlm->spinlock);
559                 /* If we're not clearing the node bit then we intend
560                  * to loop back around to try again. */
561                 if (clear_node)
562                         clear_bit(node, dlm->domain_map);
563         }
564         spin_unlock(&dlm->spinlock);
565 }
566
567 int dlm_joined(struct dlm_ctxt *dlm)
568 {
569         int ret = 0;
570
571         spin_lock(&dlm_domain_lock);
572
573         if (dlm->dlm_state == DLM_CTXT_JOINED)
574                 ret = 1;
575
576         spin_unlock(&dlm_domain_lock);
577
578         return ret;
579 }
580
581 int dlm_shutting_down(struct dlm_ctxt *dlm)
582 {
583         int ret = 0;
584
585         spin_lock(&dlm_domain_lock);
586
587         if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN)
588                 ret = 1;
589
590         spin_unlock(&dlm_domain_lock);
591
592         return ret;
593 }
594
595 void dlm_unregister_domain(struct dlm_ctxt *dlm)
596 {
597         int leave = 0;
598
599         spin_lock(&dlm_domain_lock);
600         BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
601         BUG_ON(!dlm->num_joins);
602
603         dlm->num_joins--;
604         if (!dlm->num_joins) {
605                 /* We mark it "in shutdown" now so new register
606                  * requests wait until we've completely left the
607                  * domain. Don't use DLM_CTXT_LEAVING yet as we still
608                  * want new domain joins to communicate with us at
609                  * least until we've completed migration of our
610                  * resources. */
611                 dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
612                 leave = 1;
613         }
614         spin_unlock(&dlm_domain_lock);
615
616         if (leave) {
617                 mlog(0, "shutting down domain %s\n", dlm->name);
618
619                 /* We changed dlm state, notify the thread */
620                 dlm_kick_thread(dlm, NULL);
621
622                 while (dlm_migrate_all_locks(dlm)) {
623                         mlog(0, "%s: more migration to do\n", dlm->name);
624                 }
625                 dlm_mark_domain_leaving(dlm);
626                 dlm_leave_domain(dlm);
627                 dlm_complete_dlm_shutdown(dlm);
628         }
629         dlm_put(dlm);
630 }
631 EXPORT_SYMBOL_GPL(dlm_unregister_domain);
632
633 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
634 {
635         struct dlm_query_join_request *query;
636         enum dlm_query_join_response response;
637         struct dlm_ctxt *dlm = NULL;
638
639         query = (struct dlm_query_join_request *) msg->buf;
640
641         mlog(0, "node %u wants to join domain %s\n", query->node_idx,
642                   query->domain);
643
644         /*
645          * If heartbeat doesn't consider the node live, tell it
646          * to back off and try again.  This gives heartbeat a chance
647          * to catch up.
648          */
649         if (!o2hb_check_node_heartbeating(query->node_idx)) {
650                 mlog(0, "node %u is not in our live map yet\n",
651                      query->node_idx);
652
653                 response = JOIN_DISALLOW;
654                 goto respond;
655         }
656
657         response = JOIN_OK_NO_MAP;
658
659         spin_lock(&dlm_domain_lock);
660         dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
661         /* Once the dlm ctxt is marked as leaving then we don't want
662          * to be put in someone's domain map. 
663          * Also, explicitly disallow joining at certain troublesome
664          * times (ie. during recovery). */
665         if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
666                 int bit = query->node_idx;
667                 spin_lock(&dlm->spinlock);
668
669                 if (dlm->dlm_state == DLM_CTXT_NEW &&
670                     dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
671                         /*If this is a brand new context and we
672                          * haven't started our join process yet, then
673                          * the other node won the race. */
674                         response = JOIN_OK_NO_MAP;
675                 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
676                         /* Disallow parallel joins. */
677                         response = JOIN_DISALLOW;
678                 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
679                         mlog(ML_NOTICE, "node %u trying to join, but recovery "
680                              "is ongoing.\n", bit);
681                         response = JOIN_DISALLOW;
682                 } else if (test_bit(bit, dlm->recovery_map)) {
683                         mlog(ML_NOTICE, "node %u trying to join, but it "
684                              "still needs recovery.\n", bit);
685                         response = JOIN_DISALLOW;
686                 } else if (test_bit(bit, dlm->domain_map)) {
687                         mlog(ML_NOTICE, "node %u trying to join, but it "
688                              "is still in the domain! needs recovery?\n",
689                              bit);
690                         response = JOIN_DISALLOW;
691                 } else {
692                         /* Alright we're fully a part of this domain
693                          * so we keep some state as to who's joining
694                          * and indicate to him that needs to be fixed
695                          * up. */
696                         response = JOIN_OK;
697                         __dlm_set_joining_node(dlm, query->node_idx);
698                 }
699
700                 spin_unlock(&dlm->spinlock);
701         }
702         spin_unlock(&dlm_domain_lock);
703
704 respond:
705         mlog(0, "We respond with %u\n", response);
706
707         return response;
708 }
709
710 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data)
711 {
712         struct dlm_assert_joined *assert;
713         struct dlm_ctxt *dlm = NULL;
714
715         assert = (struct dlm_assert_joined *) msg->buf;
716
717         mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
718                   assert->domain);
719
720         spin_lock(&dlm_domain_lock);
721         dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
722         /* XXX should we consider no dlm ctxt an error? */
723         if (dlm) {
724                 spin_lock(&dlm->spinlock);
725
726                 /* Alright, this node has officially joined our
727                  * domain. Set him in the map and clean up our
728                  * leftover join state. */
729                 BUG_ON(dlm->joining_node != assert->node_idx);
730                 set_bit(assert->node_idx, dlm->domain_map);
731                 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
732
733                 printk(KERN_INFO "ocfs2_dlm: Node %u joins domain %s\n",
734                        assert->node_idx, dlm->name);
735                 __dlm_print_nodes(dlm);
736
737                 /* notify anything attached to the heartbeat events */
738                 dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
739
740                 spin_unlock(&dlm->spinlock);
741         }
742         spin_unlock(&dlm_domain_lock);
743
744         return 0;
745 }
746
747 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data)
748 {
749         struct dlm_cancel_join *cancel;
750         struct dlm_ctxt *dlm = NULL;
751
752         cancel = (struct dlm_cancel_join *) msg->buf;
753
754         mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
755                   cancel->domain);
756
757         spin_lock(&dlm_domain_lock);
758         dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
759
760         if (dlm) {
761                 spin_lock(&dlm->spinlock);
762
763                 /* Yikes, this guy wants to cancel his join. No
764                  * problem, we simply cleanup our join state. */
765                 BUG_ON(dlm->joining_node != cancel->node_idx);
766                 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
767
768                 spin_unlock(&dlm->spinlock);
769         }
770         spin_unlock(&dlm_domain_lock);
771
772         return 0;
773 }
774
775 static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
776                                     unsigned int node)
777 {
778         int status;
779         struct dlm_cancel_join cancel_msg;
780
781         memset(&cancel_msg, 0, sizeof(cancel_msg));
782         cancel_msg.node_idx = dlm->node_num;
783         cancel_msg.name_len = strlen(dlm->name);
784         memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
785
786         status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
787                                     &cancel_msg, sizeof(cancel_msg), node,
788                                     NULL);
789         if (status < 0) {
790                 mlog_errno(status);
791                 goto bail;
792         }
793
794 bail:
795         return status;
796 }
797
798 /* map_size should be in bytes. */
799 static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
800                                  unsigned long *node_map,
801                                  unsigned int map_size)
802 {
803         int status, tmpstat;
804         unsigned int node;
805
806         if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
807                          sizeof(unsigned long))) {
808                 mlog(ML_ERROR,
809                      "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
810                      map_size, BITS_TO_LONGS(O2NM_MAX_NODES));
811                 return -EINVAL;
812         }
813
814         status = 0;
815         node = -1;
816         while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
817                                      node + 1)) < O2NM_MAX_NODES) {
818                 if (node == dlm->node_num)
819                         continue;
820
821                 tmpstat = dlm_send_one_join_cancel(dlm, node);
822                 if (tmpstat) {
823                         mlog(ML_ERROR, "Error return %d cancelling join on "
824                              "node %d\n", tmpstat, node);
825                         if (!status)
826                                 status = tmpstat;
827                 }
828         }
829
830         if (status)
831                 mlog_errno(status);
832         return status;
833 }
834
835 static int dlm_request_join(struct dlm_ctxt *dlm,
836                             int node,
837                             enum dlm_query_join_response *response)
838 {
839         int status, retval;
840         struct dlm_query_join_request join_msg;
841
842         mlog(0, "querying node %d\n", node);
843
844         memset(&join_msg, 0, sizeof(join_msg));
845         join_msg.node_idx = dlm->node_num;
846         join_msg.name_len = strlen(dlm->name);
847         memcpy(join_msg.domain, dlm->name, join_msg.name_len);
848
849         status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
850                                     sizeof(join_msg), node, &retval);
851         if (status < 0 && status != -ENOPROTOOPT) {
852                 mlog_errno(status);
853                 goto bail;
854         }
855
856         /* -ENOPROTOOPT from the net code means the other side isn't
857             listening for our message type -- that's fine, it means
858             his dlm isn't up, so we can consider him a 'yes' but not
859             joined into the domain.  */
860         if (status == -ENOPROTOOPT) {
861                 status = 0;
862                 *response = JOIN_OK_NO_MAP;
863         } else if (retval == JOIN_DISALLOW ||
864                    retval == JOIN_OK ||
865                    retval == JOIN_OK_NO_MAP) {
866                 *response = retval;
867         } else {
868                 status = -EINVAL;
869                 mlog(ML_ERROR, "invalid response %d from node %u\n", retval,
870                      node);
871         }
872
873         mlog(0, "status %d, node %d response is %d\n", status, node,
874                   *response);
875
876 bail:
877         return status;
878 }
879
880 static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
881                                     unsigned int node)
882 {
883         int status;
884         struct dlm_assert_joined assert_msg;
885
886         mlog(0, "Sending join assert to node %u\n", node);
887
888         memset(&assert_msg, 0, sizeof(assert_msg));
889         assert_msg.node_idx = dlm->node_num;
890         assert_msg.name_len = strlen(dlm->name);
891         memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
892
893         status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
894                                     &assert_msg, sizeof(assert_msg), node,
895                                     NULL);
896         if (status < 0)
897                 mlog_errno(status);
898
899         return status;
900 }
901
902 static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
903                                   unsigned long *node_map)
904 {
905         int status, node, live;
906
907         status = 0;
908         node = -1;
909         while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
910                                      node + 1)) < O2NM_MAX_NODES) {
911                 if (node == dlm->node_num)
912                         continue;
913
914                 do {
915                         /* It is very important that this message be
916                          * received so we spin until either the node
917                          * has died or it gets the message. */
918                         status = dlm_send_one_join_assert(dlm, node);
919
920                         spin_lock(&dlm->spinlock);
921                         live = test_bit(node, dlm->live_nodes_map);
922                         spin_unlock(&dlm->spinlock);
923
924                         if (status) {
925                                 mlog(ML_ERROR, "Error return %d asserting "
926                                      "join on node %d\n", status, node);
927
928                                 /* give us some time between errors... */
929                                 if (live)
930                                         msleep(DLM_DOMAIN_BACKOFF_MS);
931                         }
932                 } while (status && live);
933         }
934 }
935
936 struct domain_join_ctxt {
937         unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
938         unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
939 };
940
941 static int dlm_should_restart_join(struct dlm_ctxt *dlm,
942                                    struct domain_join_ctxt *ctxt,
943                                    enum dlm_query_join_response response)
944 {
945         int ret;
946
947         if (response == JOIN_DISALLOW) {
948                 mlog(0, "Latest response of disallow -- should restart\n");
949                 return 1;
950         }
951
952         spin_lock(&dlm->spinlock);
953         /* For now, we restart the process if the node maps have
954          * changed at all */
955         ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
956                      sizeof(dlm->live_nodes_map));
957         spin_unlock(&dlm->spinlock);
958
959         if (ret)
960                 mlog(0, "Node maps changed -- should restart\n");
961
962         return ret;
963 }
964
965 static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
966 {
967         int status = 0, tmpstat, node;
968         struct domain_join_ctxt *ctxt;
969         enum dlm_query_join_response response;
970
971         mlog_entry("%p", dlm);
972
973         ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL);
974         if (!ctxt) {
975                 status = -ENOMEM;
976                 mlog_errno(status);
977                 goto bail;
978         }
979
980         /* group sem locking should work for us here -- we're already
981          * registered for heartbeat events so filling this should be
982          * atomic wrt getting those handlers called. */
983         o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
984
985         spin_lock(&dlm->spinlock);
986         memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
987
988         __dlm_set_joining_node(dlm, dlm->node_num);
989
990         spin_unlock(&dlm->spinlock);
991
992         node = -1;
993         while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
994                                      node + 1)) < O2NM_MAX_NODES) {
995                 if (node == dlm->node_num)
996                         continue;
997
998                 status = dlm_request_join(dlm, node, &response);
999                 if (status < 0) {
1000                         mlog_errno(status);
1001                         goto bail;
1002                 }
1003
1004                 /* Ok, either we got a response or the node doesn't have a
1005                  * dlm up. */
1006                 if (response == JOIN_OK)
1007                         set_bit(node, ctxt->yes_resp_map);
1008
1009                 if (dlm_should_restart_join(dlm, ctxt, response)) {
1010                         status = -EAGAIN;
1011                         goto bail;
1012                 }
1013         }
1014
1015         mlog(0, "Yay, done querying nodes!\n");
1016
1017         /* Yay, everyone agree's we can join the domain. My domain is
1018          * comprised of all nodes who were put in the
1019          * yes_resp_map. Copy that into our domain map and send a join
1020          * assert message to clean up everyone elses state. */
1021         spin_lock(&dlm->spinlock);
1022         memcpy(dlm->domain_map, ctxt->yes_resp_map,
1023                sizeof(ctxt->yes_resp_map));
1024         set_bit(dlm->node_num, dlm->domain_map);
1025         spin_unlock(&dlm->spinlock);
1026
1027         dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
1028
1029         /* Joined state *must* be set before the joining node
1030          * information, otherwise the query_join handler may read no
1031          * current joiner but a state of NEW and tell joining nodes
1032          * we're not in the domain. */
1033         spin_lock(&dlm_domain_lock);
1034         dlm->dlm_state = DLM_CTXT_JOINED;
1035         dlm->num_joins++;
1036         spin_unlock(&dlm_domain_lock);
1037
1038 bail:
1039         spin_lock(&dlm->spinlock);
1040         __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1041         if (!status)
1042                 __dlm_print_nodes(dlm);
1043         spin_unlock(&dlm->spinlock);
1044
1045         if (ctxt) {
1046                 /* Do we need to send a cancel message to any nodes? */
1047                 if (status < 0) {
1048                         tmpstat = dlm_send_join_cancels(dlm,
1049                                                         ctxt->yes_resp_map,
1050                                                         sizeof(ctxt->yes_resp_map));
1051                         if (tmpstat < 0)
1052                                 mlog_errno(tmpstat);
1053                 }
1054                 kfree(ctxt);
1055         }
1056
1057         mlog(0, "returning %d\n", status);
1058         return status;
1059 }
1060
1061 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
1062 {
1063         o2hb_unregister_callback(&dlm->dlm_hb_up);
1064         o2hb_unregister_callback(&dlm->dlm_hb_down);
1065         o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
1066 }
1067
1068 static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1069 {
1070         int status;
1071
1072         mlog(0, "registering handlers.\n");
1073
1074         o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
1075                             dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
1076         status = o2hb_register_callback(&dlm->dlm_hb_down);
1077         if (status)
1078                 goto bail;
1079
1080         o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
1081                             dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
1082         status = o2hb_register_callback(&dlm->dlm_hb_up);
1083         if (status)
1084                 goto bail;
1085
1086         status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1087                                         sizeof(struct dlm_master_request),
1088                                         dlm_master_request_handler,
1089                                         dlm, &dlm->dlm_domain_handlers);
1090         if (status)
1091                 goto bail;
1092
1093         status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1094                                         sizeof(struct dlm_assert_master),
1095                                         dlm_assert_master_handler,
1096                                         dlm, &dlm->dlm_domain_handlers);
1097         if (status)
1098                 goto bail;
1099
1100         status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1101                                         sizeof(struct dlm_create_lock),
1102                                         dlm_create_lock_handler,
1103                                         dlm, &dlm->dlm_domain_handlers);
1104         if (status)
1105                 goto bail;
1106
1107         status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1108                                         DLM_CONVERT_LOCK_MAX_LEN,
1109                                         dlm_convert_lock_handler,
1110                                         dlm, &dlm->dlm_domain_handlers);
1111         if (status)
1112                 goto bail;
1113
1114         status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1115                                         DLM_UNLOCK_LOCK_MAX_LEN,
1116                                         dlm_unlock_lock_handler,
1117                                         dlm, &dlm->dlm_domain_handlers);
1118         if (status)
1119                 goto bail;
1120
1121         status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1122                                         DLM_PROXY_AST_MAX_LEN,
1123                                         dlm_proxy_ast_handler,
1124                                         dlm, &dlm->dlm_domain_handlers);
1125         if (status)
1126                 goto bail;
1127
1128         status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1129                                         sizeof(struct dlm_exit_domain),
1130                                         dlm_exit_domain_handler,
1131                                         dlm, &dlm->dlm_domain_handlers);
1132         if (status)
1133                 goto bail;
1134
1135         status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
1136                                         sizeof(struct dlm_deref_lockres),
1137                                         dlm_deref_lockres_handler,
1138                                         dlm, &dlm->dlm_domain_handlers);
1139         if (status)
1140                 goto bail;
1141
1142         status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1143                                         sizeof(struct dlm_migrate_request),
1144                                         dlm_migrate_request_handler,
1145                                         dlm, &dlm->dlm_domain_handlers);
1146         if (status)
1147                 goto bail;
1148
1149         status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1150                                         DLM_MIG_LOCKRES_MAX_LEN,
1151                                         dlm_mig_lockres_handler,
1152                                         dlm, &dlm->dlm_domain_handlers);
1153         if (status)
1154                 goto bail;
1155
1156         status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1157                                         sizeof(struct dlm_master_requery),
1158                                         dlm_master_requery_handler,
1159                                         dlm, &dlm->dlm_domain_handlers);
1160         if (status)
1161                 goto bail;
1162
1163         status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1164                                         sizeof(struct dlm_lock_request),
1165                                         dlm_request_all_locks_handler,
1166                                         dlm, &dlm->dlm_domain_handlers);
1167         if (status)
1168                 goto bail;
1169
1170         status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1171                                         sizeof(struct dlm_reco_data_done),
1172                                         dlm_reco_data_done_handler,
1173                                         dlm, &dlm->dlm_domain_handlers);
1174         if (status)
1175                 goto bail;
1176
1177         status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1178                                         sizeof(struct dlm_begin_reco),
1179                                         dlm_begin_reco_handler,
1180                                         dlm, &dlm->dlm_domain_handlers);
1181         if (status)
1182                 goto bail;
1183
1184         status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1185                                         sizeof(struct dlm_finalize_reco),
1186                                         dlm_finalize_reco_handler,
1187                                         dlm, &dlm->dlm_domain_handlers);
1188         if (status)
1189                 goto bail;
1190
1191 bail:
1192         if (status)
1193                 dlm_unregister_domain_handlers(dlm);
1194
1195         return status;
1196 }
1197
1198 static int dlm_join_domain(struct dlm_ctxt *dlm)
1199 {
1200         int status;
1201
1202         BUG_ON(!dlm);
1203
1204         mlog(0, "Join domain %s\n", dlm->name);
1205
1206         status = dlm_register_domain_handlers(dlm);
1207         if (status) {
1208                 mlog_errno(status);
1209                 goto bail;
1210         }
1211
1212         status = dlm_launch_thread(dlm);
1213         if (status < 0) {
1214                 mlog_errno(status);
1215                 goto bail;
1216         }
1217
1218         status = dlm_launch_recovery_thread(dlm);
1219         if (status < 0) {
1220                 mlog_errno(status);
1221                 goto bail;
1222         }
1223
1224         dlm->dlm_worker = create_singlethread_workqueue("dlm_wq");
1225         if (!dlm->dlm_worker) {
1226                 status = -ENOMEM;
1227                 mlog_errno(status);
1228                 goto bail;
1229         }
1230
1231         do {
1232                 unsigned int backoff;
1233                 status = dlm_try_to_join_domain(dlm);
1234
1235                 /* If we're racing another node to the join, then we
1236                  * need to back off temporarily and let them
1237                  * complete. */
1238                 if (status == -EAGAIN) {
1239                         if (signal_pending(current)) {
1240                                 status = -ERESTARTSYS;
1241                                 goto bail;
1242                         }
1243
1244                         /*
1245                          * <chip> After you!
1246                          * <dale> No, after you!
1247                          * <chip> I insist!
1248                          * <dale> But you first!
1249                          * ...
1250                          */
1251                         backoff = (unsigned int)(jiffies & 0x3);
1252                         backoff *= DLM_DOMAIN_BACKOFF_MS;
1253                         mlog(0, "backoff %d\n", backoff);
1254                         msleep(backoff);
1255                 }
1256         } while (status == -EAGAIN);
1257
1258         if (status < 0) {
1259                 mlog_errno(status);
1260                 goto bail;
1261         }
1262
1263         status = 0;
1264 bail:
1265         wake_up(&dlm_domain_events);
1266
1267         if (status) {
1268                 dlm_unregister_domain_handlers(dlm);
1269                 dlm_complete_thread(dlm);
1270                 dlm_complete_recovery_thread(dlm);
1271                 dlm_destroy_dlm_worker(dlm);
1272         }
1273
1274         return status;
1275 }
1276
1277 static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1278                                 u32 key)
1279 {
1280         int i;
1281         struct dlm_ctxt *dlm = NULL;
1282
1283         dlm = kzalloc(sizeof(*dlm), GFP_KERNEL);
1284         if (!dlm) {
1285                 mlog_errno(-ENOMEM);
1286                 goto leave;
1287         }
1288
1289         dlm->name = kmalloc(strlen(domain) + 1, GFP_KERNEL);
1290         if (dlm->name == NULL) {
1291                 mlog_errno(-ENOMEM);
1292                 kfree(dlm);
1293                 dlm = NULL;
1294                 goto leave;
1295         }
1296
1297         dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
1298         if (!dlm->lockres_hash) {
1299                 mlog_errno(-ENOMEM);
1300                 kfree(dlm->name);
1301                 kfree(dlm);
1302                 dlm = NULL;
1303                 goto leave;
1304         }
1305
1306         for (i = 0; i < DLM_HASH_BUCKETS; i++)
1307                 INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
1308
1309         strcpy(dlm->name, domain);
1310         dlm->key = key;
1311         dlm->node_num = o2nm_this_node();
1312
1313         spin_lock_init(&dlm->spinlock);
1314         spin_lock_init(&dlm->master_lock);
1315         spin_lock_init(&dlm->ast_lock);
1316         INIT_LIST_HEAD(&dlm->list);
1317         INIT_LIST_HEAD(&dlm->dirty_list);
1318         INIT_LIST_HEAD(&dlm->reco.resources);
1319         INIT_LIST_HEAD(&dlm->reco.received);
1320         INIT_LIST_HEAD(&dlm->reco.node_data);
1321         INIT_LIST_HEAD(&dlm->purge_list);
1322         INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
1323         dlm->reco.state = 0;
1324
1325         INIT_LIST_HEAD(&dlm->pending_asts);
1326         INIT_LIST_HEAD(&dlm->pending_basts);
1327
1328         mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
1329                   dlm->recovery_map, &(dlm->recovery_map[0]));
1330
1331         memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
1332         memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
1333         memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
1334
1335         dlm->dlm_thread_task = NULL;
1336         dlm->dlm_reco_thread_task = NULL;
1337         dlm->dlm_worker = NULL;
1338         init_waitqueue_head(&dlm->dlm_thread_wq);
1339         init_waitqueue_head(&dlm->dlm_reco_thread_wq);
1340         init_waitqueue_head(&dlm->reco.event);
1341         init_waitqueue_head(&dlm->ast_wq);
1342         init_waitqueue_head(&dlm->migration_wq);
1343         INIT_LIST_HEAD(&dlm->master_list);
1344         INIT_LIST_HEAD(&dlm->mle_hb_events);
1345
1346         dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
1347         init_waitqueue_head(&dlm->dlm_join_events);
1348
1349         dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
1350         dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
1351         atomic_set(&dlm->local_resources, 0);
1352         atomic_set(&dlm->remote_resources, 0);
1353         atomic_set(&dlm->unknown_resources, 0);
1354
1355         spin_lock_init(&dlm->work_lock);
1356         INIT_LIST_HEAD(&dlm->work_list);
1357         INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work);
1358
1359         kref_init(&dlm->dlm_refs);
1360         dlm->dlm_state = DLM_CTXT_NEW;
1361
1362         INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
1363
1364         mlog(0, "context init: refcount %u\n",
1365                   atomic_read(&dlm->dlm_refs.refcount));
1366
1367 leave:
1368         return dlm;
1369 }
1370
1371 /*
1372  * dlm_register_domain: one-time setup per "domain"
1373  */
1374 struct dlm_ctxt * dlm_register_domain(const char *domain,
1375                                u32 key)
1376 {
1377         int ret;
1378         struct dlm_ctxt *dlm = NULL;
1379         struct dlm_ctxt *new_ctxt = NULL;
1380
1381         if (strlen(domain) > O2NM_MAX_NAME_LEN) {
1382                 ret = -ENAMETOOLONG;
1383                 mlog(ML_ERROR, "domain name length too long\n");
1384                 goto leave;
1385         }
1386
1387         if (!o2hb_check_local_node_heartbeating()) {
1388                 mlog(ML_ERROR, "the local node has not been configured, or is "
1389                      "not heartbeating\n");
1390                 ret = -EPROTO;
1391                 goto leave;
1392         }
1393
1394         mlog(0, "register called for domain \"%s\"\n", domain);
1395
1396 retry:
1397         dlm = NULL;
1398         if (signal_pending(current)) {
1399                 ret = -ERESTARTSYS;
1400                 mlog_errno(ret);
1401                 goto leave;
1402         }
1403
1404         spin_lock(&dlm_domain_lock);
1405
1406         dlm = __dlm_lookup_domain(domain);
1407         if (dlm) {
1408                 if (dlm->dlm_state != DLM_CTXT_JOINED) {
1409                         spin_unlock(&dlm_domain_lock);
1410
1411                         mlog(0, "This ctxt is not joined yet!\n");
1412                         wait_event_interruptible(dlm_domain_events,
1413                                                  dlm_wait_on_domain_helper(
1414                                                          domain));
1415                         goto retry;
1416                 }
1417
1418                 __dlm_get(dlm);
1419                 dlm->num_joins++;
1420
1421                 spin_unlock(&dlm_domain_lock);
1422
1423                 ret = 0;
1424                 goto leave;
1425         }
1426
1427         /* doesn't exist */
1428         if (!new_ctxt) {
1429                 spin_unlock(&dlm_domain_lock);
1430
1431                 new_ctxt = dlm_alloc_ctxt(domain, key);
1432                 if (new_ctxt)
1433                         goto retry;
1434
1435                 ret = -ENOMEM;
1436                 mlog_errno(ret);
1437                 goto leave;
1438         }
1439
1440         /* a little variable switch-a-roo here... */
1441         dlm = new_ctxt;
1442         new_ctxt = NULL;
1443
1444         /* add the new domain */
1445         list_add_tail(&dlm->list, &dlm_domains);
1446         spin_unlock(&dlm_domain_lock);
1447
1448         ret = dlm_join_domain(dlm);
1449         if (ret) {
1450                 mlog_errno(ret);
1451                 dlm_put(dlm);
1452                 goto leave;
1453         }
1454
1455         ret = 0;
1456 leave:
1457         if (new_ctxt)
1458                 dlm_free_ctxt_mem(new_ctxt);
1459
1460         if (ret < 0)
1461                 dlm = ERR_PTR(ret);
1462
1463         return dlm;
1464 }
1465 EXPORT_SYMBOL_GPL(dlm_register_domain);
1466
1467 static LIST_HEAD(dlm_join_handlers);
1468
1469 static void dlm_unregister_net_handlers(void)
1470 {
1471         o2net_unregister_handler_list(&dlm_join_handlers);
1472 }
1473
1474 static int dlm_register_net_handlers(void)
1475 {
1476         int status = 0;
1477
1478         status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1479                                         sizeof(struct dlm_query_join_request),
1480                                         dlm_query_join_handler,
1481                                         NULL, &dlm_join_handlers);
1482         if (status)
1483                 goto bail;
1484
1485         status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1486                                         sizeof(struct dlm_assert_joined),
1487                                         dlm_assert_joined_handler,
1488                                         NULL, &dlm_join_handlers);
1489         if (status)
1490                 goto bail;
1491
1492         status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1493                                         sizeof(struct dlm_cancel_join),
1494                                         dlm_cancel_join_handler,
1495                                         NULL, &dlm_join_handlers);
1496
1497 bail:
1498         if (status < 0)
1499                 dlm_unregister_net_handlers();
1500
1501         return status;
1502 }
1503
1504 /* Domain eviction callback handling.
1505  *
1506  * The file system requires notification of node death *before* the
1507  * dlm completes it's recovery work, otherwise it may be able to
1508  * acquire locks on resources requiring recovery. Since the dlm can
1509  * evict a node from it's domain *before* heartbeat fires, a similar
1510  * mechanism is required. */
1511
1512 /* Eviction is not expected to happen often, so a per-domain lock is
1513  * not necessary. Eviction callbacks are allowed to sleep for short
1514  * periods of time. */
1515 static DECLARE_RWSEM(dlm_callback_sem);
1516
1517 void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
1518                                         int node_num)
1519 {
1520         struct list_head *iter;
1521         struct dlm_eviction_cb *cb;
1522
1523         down_read(&dlm_callback_sem);
1524         list_for_each(iter, &dlm->dlm_eviction_callbacks) {
1525                 cb = list_entry(iter, struct dlm_eviction_cb, ec_item);
1526
1527                 cb->ec_func(node_num, cb->ec_data);
1528         }
1529         up_read(&dlm_callback_sem);
1530 }
1531
1532 void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
1533                            dlm_eviction_func *f,
1534                            void *data)
1535 {
1536         INIT_LIST_HEAD(&cb->ec_item);
1537         cb->ec_func = f;
1538         cb->ec_data = data;
1539 }
1540 EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
1541
1542 void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
1543                               struct dlm_eviction_cb *cb)
1544 {
1545         down_write(&dlm_callback_sem);
1546         list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
1547         up_write(&dlm_callback_sem);
1548 }
1549 EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
1550
1551 void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
1552 {
1553         down_write(&dlm_callback_sem);
1554         list_del_init(&cb->ec_item);
1555         up_write(&dlm_callback_sem);
1556 }
1557 EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
1558
1559 static int __init dlm_init(void)
1560 {
1561         int status;
1562
1563         dlm_print_version();
1564
1565         status = dlm_init_mle_cache();
1566         if (status)
1567                 return -1;
1568
1569         status = dlm_register_net_handlers();
1570         if (status) {
1571                 dlm_destroy_mle_cache();
1572                 return -1;
1573         }
1574
1575         return 0;
1576 }
1577
1578 static void __exit dlm_exit (void)
1579 {
1580         dlm_unregister_net_handlers();
1581         dlm_destroy_mle_cache();
1582 }
1583
1584 MODULE_AUTHOR("Oracle");
1585 MODULE_LICENSE("GPL");
1586
1587 module_init(dlm_init);
1588 module_exit(dlm_exit);