ocfs2_dlm: Calling post handler function in assert master handler
[powerpc.git] / fs / ocfs2 / dlm / dlmdomain.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmdomain.c
5  *
6  * defines domain join / leave apis
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26
27 #include <linux/module.h>
28 #include <linux/types.h>
29 #include <linux/slab.h>
30 #include <linux/highmem.h>
31 #include <linux/utsname.h>
32 #include <linux/init.h>
33 #include <linux/spinlock.h>
34 #include <linux/delay.h>
35 #include <linux/err.h>
36
37 #include "cluster/heartbeat.h"
38 #include "cluster/nodemanager.h"
39 #include "cluster/tcp.h"
40
41 #include "dlmapi.h"
42 #include "dlmcommon.h"
43
44 #include "dlmdomain.h"
45
46 #include "dlmver.h"
47
48 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
49 #include "cluster/masklog.h"
50
51 static void dlm_free_pagevec(void **vec, int pages)
52 {
53         while (pages--)
54                 free_page((unsigned long)vec[pages]);
55         kfree(vec);
56 }
57
58 static void **dlm_alloc_pagevec(int pages)
59 {
60         void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
61         int i;
62
63         if (!vec)
64                 return NULL;
65
66         for (i = 0; i < pages; i++)
67                 if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
68                         goto out_free;
69
70         mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
71              pages, (unsigned long)DLM_HASH_PAGES,
72              (unsigned long)DLM_BUCKETS_PER_PAGE);
73         return vec;
74 out_free:
75         dlm_free_pagevec(vec, i);
76         return NULL;
77 }
78
79 /*
80  *
81  * spinlock lock ordering: if multiple locks are needed, obey this ordering:
82  *    dlm_domain_lock
83  *    struct dlm_ctxt->spinlock
84  *    struct dlm_lock_resource->spinlock
85  *    struct dlm_ctxt->master_lock
86  *    struct dlm_ctxt->ast_lock
87  *    dlm_master_list_entry->spinlock
88  *    dlm_lock->spinlock
89  *
90  */
91
92 DEFINE_SPINLOCK(dlm_domain_lock);
93 LIST_HEAD(dlm_domains);
94 static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
95
96 #define DLM_DOMAIN_BACKOFF_MS 200
97
98 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
99                                   void **ret_data);
100 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
101                                      void **ret_data);
102 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
103                                    void **ret_data);
104 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
105                                    void **ret_data);
106
107 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
108
109 void __dlm_unhash_lockres(struct dlm_lock_resource *lockres)
110 {
111         hlist_del_init(&lockres->hash_node);
112         dlm_lockres_put(lockres);
113 }
114
115 void __dlm_insert_lockres(struct dlm_ctxt *dlm,
116                        struct dlm_lock_resource *res)
117 {
118         struct hlist_head *bucket;
119         struct qstr *q;
120
121         assert_spin_locked(&dlm->spinlock);
122
123         q = &res->lockname;
124         bucket = dlm_lockres_hash(dlm, q->hash);
125
126         /* get a reference for our hashtable */
127         dlm_lockres_get(res);
128
129         hlist_add_head(&res->hash_node, bucket);
130 }
131
132 struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
133                                                      const char *name,
134                                                      unsigned int len,
135                                                      unsigned int hash)
136 {
137         struct hlist_head *bucket;
138         struct hlist_node *list;
139
140         mlog_entry("%.*s\n", len, name);
141
142         assert_spin_locked(&dlm->spinlock);
143
144         bucket = dlm_lockres_hash(dlm, hash);
145
146         hlist_for_each(list, bucket) {
147                 struct dlm_lock_resource *res = hlist_entry(list,
148                         struct dlm_lock_resource, hash_node);
149                 if (res->lockname.name[0] != name[0])
150                         continue;
151                 if (unlikely(res->lockname.len != len))
152                         continue;
153                 if (memcmp(res->lockname.name + 1, name + 1, len - 1))
154                         continue;
155                 dlm_lockres_get(res);
156                 return res;
157         }
158         return NULL;
159 }
160
161 /* intended to be called by functions which do not care about lock
162  * resources which are being purged (most net _handler functions).
163  * this will return NULL for any lock resource which is found but
164  * currently in the process of dropping its mastery reference.
165  * use __dlm_lookup_lockres_full when you need the lock resource
166  * regardless (e.g. dlm_get_lock_resource) */
167 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
168                                                 const char *name,
169                                                 unsigned int len,
170                                                 unsigned int hash)
171 {
172         struct dlm_lock_resource *res = NULL;
173
174         mlog_entry("%.*s\n", len, name);
175
176         assert_spin_locked(&dlm->spinlock);
177
178         res = __dlm_lookup_lockres_full(dlm, name, len, hash);
179         if (res) {
180                 spin_lock(&res->spinlock);
181                 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
182                         spin_unlock(&res->spinlock);
183                         dlm_lockres_put(res);
184                         return NULL;
185                 }
186                 spin_unlock(&res->spinlock);
187         }
188
189         return res;
190 }
191
192 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
193                                     const char *name,
194                                     unsigned int len)
195 {
196         struct dlm_lock_resource *res;
197         unsigned int hash = dlm_lockid_hash(name, len);
198
199         spin_lock(&dlm->spinlock);
200         res = __dlm_lookup_lockres(dlm, name, len, hash);
201         spin_unlock(&dlm->spinlock);
202         return res;
203 }
204
205 static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
206 {
207         struct dlm_ctxt *tmp = NULL;
208         struct list_head *iter;
209
210         assert_spin_locked(&dlm_domain_lock);
211
212         /* tmp->name here is always NULL terminated,
213          * but domain may not be! */
214         list_for_each(iter, &dlm_domains) {
215                 tmp = list_entry (iter, struct dlm_ctxt, list);
216                 if (strlen(tmp->name) == len &&
217                     memcmp(tmp->name, domain, len)==0)
218                         break;
219                 tmp = NULL;
220         }
221
222         return tmp;
223 }
224
225 /* For null terminated domain strings ONLY */
226 static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
227 {
228         assert_spin_locked(&dlm_domain_lock);
229
230         return __dlm_lookup_domain_full(domain, strlen(domain));
231 }
232
233
234 /* returns true on one of two conditions:
235  * 1) the domain does not exist
236  * 2) the domain exists and it's state is "joined" */
237 static int dlm_wait_on_domain_helper(const char *domain)
238 {
239         int ret = 0;
240         struct dlm_ctxt *tmp = NULL;
241
242         spin_lock(&dlm_domain_lock);
243
244         tmp = __dlm_lookup_domain(domain);
245         if (!tmp)
246                 ret = 1;
247         else if (tmp->dlm_state == DLM_CTXT_JOINED)
248                 ret = 1;
249
250         spin_unlock(&dlm_domain_lock);
251         return ret;
252 }
253
254 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
255 {
256         if (dlm->lockres_hash)
257                 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
258
259         if (dlm->name)
260                 kfree(dlm->name);
261
262         kfree(dlm);
263 }
264
265 /* A little strange - this function will be called while holding
266  * dlm_domain_lock and is expected to be holding it on the way out. We
267  * will however drop and reacquire it multiple times */
268 static void dlm_ctxt_release(struct kref *kref)
269 {
270         struct dlm_ctxt *dlm;
271
272         dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
273
274         BUG_ON(dlm->num_joins);
275         BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
276
277         /* we may still be in the list if we hit an error during join. */
278         list_del_init(&dlm->list);
279
280         spin_unlock(&dlm_domain_lock);
281
282         mlog(0, "freeing memory from domain %s\n", dlm->name);
283
284         wake_up(&dlm_domain_events);
285
286         dlm_free_ctxt_mem(dlm);
287
288         spin_lock(&dlm_domain_lock);
289 }
290
291 void dlm_put(struct dlm_ctxt *dlm)
292 {
293         spin_lock(&dlm_domain_lock);
294         kref_put(&dlm->dlm_refs, dlm_ctxt_release);
295         spin_unlock(&dlm_domain_lock);
296 }
297
298 static void __dlm_get(struct dlm_ctxt *dlm)
299 {
300         kref_get(&dlm->dlm_refs);
301 }
302
303 /* given a questionable reference to a dlm object, gets a reference if
304  * it can find it in the list, otherwise returns NULL in which case
305  * you shouldn't trust your pointer. */
306 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
307 {
308         struct list_head *iter;
309         struct dlm_ctxt *target = NULL;
310
311         spin_lock(&dlm_domain_lock);
312
313         list_for_each(iter, &dlm_domains) {
314                 target = list_entry (iter, struct dlm_ctxt, list);
315
316                 if (target == dlm) {
317                         __dlm_get(target);
318                         break;
319                 }
320
321                 target = NULL;
322         }
323
324         spin_unlock(&dlm_domain_lock);
325
326         return target;
327 }
328
329 int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
330 {
331         int ret;
332
333         spin_lock(&dlm_domain_lock);
334         ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
335                 (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
336         spin_unlock(&dlm_domain_lock);
337
338         return ret;
339 }
340
341 static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm)
342 {
343         if (dlm->dlm_worker) {
344                 flush_workqueue(dlm->dlm_worker);
345                 destroy_workqueue(dlm->dlm_worker);
346                 dlm->dlm_worker = NULL;
347         }
348 }
349
350 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
351 {
352         dlm_unregister_domain_handlers(dlm);
353         dlm_complete_thread(dlm);
354         dlm_complete_recovery_thread(dlm);
355         dlm_destroy_dlm_worker(dlm);
356
357         /* We've left the domain. Now we can take ourselves out of the
358          * list and allow the kref stuff to help us free the
359          * memory. */
360         spin_lock(&dlm_domain_lock);
361         list_del_init(&dlm->list);
362         spin_unlock(&dlm_domain_lock);
363
364         /* Wake up anyone waiting for us to remove this domain */
365         wake_up(&dlm_domain_events);
366 }
367
368 static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
369 {
370         int i, num, n, ret = 0;
371         struct dlm_lock_resource *res;
372         struct hlist_node *iter;
373         struct hlist_head *bucket;
374         int dropped;
375
376         mlog(0, "Migrating locks from domain %s\n", dlm->name);
377
378         num = 0;
379         spin_lock(&dlm->spinlock);
380         for (i = 0; i < DLM_HASH_BUCKETS; i++) {
381 redo_bucket:
382                 n = 0;
383                 bucket = dlm_lockres_hash(dlm, i);
384                 iter = bucket->first;
385                 while (iter) {
386                         n++;
387                         res = hlist_entry(iter, struct dlm_lock_resource,
388                                           hash_node);
389                         dlm_lockres_get(res);
390                         /* migrate, if necessary.  this will drop the dlm
391                          * spinlock and retake it if it does migration. */
392                         dropped = dlm_empty_lockres(dlm, res);
393
394                         spin_lock(&res->spinlock);
395                         __dlm_lockres_calc_usage(dlm, res);
396                         iter = res->hash_node.next;
397                         spin_unlock(&res->spinlock);
398
399                         dlm_lockres_put(res);
400
401                         cond_resched_lock(&dlm->spinlock);
402
403                         if (dropped)
404                                 goto redo_bucket;
405                 }
406                 num += n;
407                 mlog(0, "%s: touched %d lockreses in bucket %d "
408                      "(tot=%d)\n", dlm->name, n, i, num);
409         }
410         spin_unlock(&dlm->spinlock);
411         wake_up(&dlm->dlm_thread_wq);
412
413         /* let the dlm thread take care of purging, keep scanning until
414          * nothing remains in the hash */
415         if (num) {
416                 mlog(0, "%s: %d lock resources in hash last pass\n",
417                      dlm->name, num);
418                 ret = -EAGAIN;
419         }
420         mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
421         return ret;
422 }
423
424 static int dlm_no_joining_node(struct dlm_ctxt *dlm)
425 {
426         int ret;
427
428         spin_lock(&dlm->spinlock);
429         ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
430         spin_unlock(&dlm->spinlock);
431
432         return ret;
433 }
434
435 static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
436 {
437         /* Yikes, a double spinlock! I need domain_lock for the dlm
438          * state and the dlm spinlock for join state... Sorry! */
439 again:
440         spin_lock(&dlm_domain_lock);
441         spin_lock(&dlm->spinlock);
442
443         if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
444                 mlog(0, "Node %d is joining, we wait on it.\n",
445                           dlm->joining_node);
446                 spin_unlock(&dlm->spinlock);
447                 spin_unlock(&dlm_domain_lock);
448
449                 wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
450                 goto again;
451         }
452
453         dlm->dlm_state = DLM_CTXT_LEAVING;
454         spin_unlock(&dlm->spinlock);
455         spin_unlock(&dlm_domain_lock);
456 }
457
458 static void __dlm_print_nodes(struct dlm_ctxt *dlm)
459 {
460         int node = -1;
461
462         assert_spin_locked(&dlm->spinlock);
463
464         printk(KERN_INFO "ocfs2_dlm: Nodes in domain (\"%s\"): ", dlm->name);
465
466         while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
467                                      node + 1)) < O2NM_MAX_NODES) {
468                 printk("%d ", node);
469         }
470         printk("\n");
471 }
472
473 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
474                                    void **ret_data)
475 {
476         struct dlm_ctxt *dlm = data;
477         unsigned int node;
478         struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
479
480         mlog_entry("%p %u %p", msg, len, data);
481
482         if (!dlm_grab(dlm))
483                 return 0;
484
485         node = exit_msg->node_idx;
486
487         printk(KERN_INFO "ocfs2_dlm: Node %u leaves domain %s\n", node, dlm->name);
488
489         spin_lock(&dlm->spinlock);
490         clear_bit(node, dlm->domain_map);
491         __dlm_print_nodes(dlm);
492
493         /* notify anything attached to the heartbeat events */
494         dlm_hb_event_notify_attached(dlm, node, 0);
495
496         spin_unlock(&dlm->spinlock);
497
498         dlm_put(dlm);
499
500         return 0;
501 }
502
503 static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm,
504                                     unsigned int node)
505 {
506         int status;
507         struct dlm_exit_domain leave_msg;
508
509         mlog(0, "Asking node %u if we can leave the domain %s me = %u\n",
510                   node, dlm->name, dlm->node_num);
511
512         memset(&leave_msg, 0, sizeof(leave_msg));
513         leave_msg.node_idx = dlm->node_num;
514
515         status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key,
516                                     &leave_msg, sizeof(leave_msg), node,
517                                     NULL);
518
519         mlog(0, "status return %d from o2net_send_message\n", status);
520
521         return status;
522 }
523
524
525 static void dlm_leave_domain(struct dlm_ctxt *dlm)
526 {
527         int node, clear_node, status;
528
529         /* At this point we've migrated away all our locks and won't
530          * accept mastership of new ones. The dlm is responsible for
531          * almost nothing now. We make sure not to confuse any joining
532          * nodes and then commence shutdown procedure. */
533
534         spin_lock(&dlm->spinlock);
535         /* Clear ourselves from the domain map */
536         clear_bit(dlm->node_num, dlm->domain_map);
537         while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
538                                      0)) < O2NM_MAX_NODES) {
539                 /* Drop the dlm spinlock. This is safe wrt the domain_map.
540                  * -nodes cannot be added now as the
541                  *   query_join_handlers knows to respond with OK_NO_MAP
542                  * -we catch the right network errors if a node is
543                  *   removed from the map while we're sending him the
544                  *   exit message. */
545                 spin_unlock(&dlm->spinlock);
546
547                 clear_node = 1;
548
549                 status = dlm_send_one_domain_exit(dlm, node);
550                 if (status < 0 &&
551                     status != -ENOPROTOOPT &&
552                     status != -ENOTCONN) {
553                         mlog(ML_NOTICE, "Error %d sending domain exit message "
554                              "to node %d\n", status, node);
555
556                         /* Not sure what to do here but lets sleep for
557                          * a bit in case this was a transient
558                          * error... */
559                         msleep(DLM_DOMAIN_BACKOFF_MS);
560                         clear_node = 0;
561                 }
562
563                 spin_lock(&dlm->spinlock);
564                 /* If we're not clearing the node bit then we intend
565                  * to loop back around to try again. */
566                 if (clear_node)
567                         clear_bit(node, dlm->domain_map);
568         }
569         spin_unlock(&dlm->spinlock);
570 }
571
572 int dlm_joined(struct dlm_ctxt *dlm)
573 {
574         int ret = 0;
575
576         spin_lock(&dlm_domain_lock);
577
578         if (dlm->dlm_state == DLM_CTXT_JOINED)
579                 ret = 1;
580
581         spin_unlock(&dlm_domain_lock);
582
583         return ret;
584 }
585
586 int dlm_shutting_down(struct dlm_ctxt *dlm)
587 {
588         int ret = 0;
589
590         spin_lock(&dlm_domain_lock);
591
592         if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN)
593                 ret = 1;
594
595         spin_unlock(&dlm_domain_lock);
596
597         return ret;
598 }
599
600 void dlm_unregister_domain(struct dlm_ctxt *dlm)
601 {
602         int leave = 0;
603
604         spin_lock(&dlm_domain_lock);
605         BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
606         BUG_ON(!dlm->num_joins);
607
608         dlm->num_joins--;
609         if (!dlm->num_joins) {
610                 /* We mark it "in shutdown" now so new register
611                  * requests wait until we've completely left the
612                  * domain. Don't use DLM_CTXT_LEAVING yet as we still
613                  * want new domain joins to communicate with us at
614                  * least until we've completed migration of our
615                  * resources. */
616                 dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
617                 leave = 1;
618         }
619         spin_unlock(&dlm_domain_lock);
620
621         if (leave) {
622                 mlog(0, "shutting down domain %s\n", dlm->name);
623
624                 /* We changed dlm state, notify the thread */
625                 dlm_kick_thread(dlm, NULL);
626
627                 while (dlm_migrate_all_locks(dlm)) {
628                         mlog(0, "%s: more migration to do\n", dlm->name);
629                 }
630                 dlm_mark_domain_leaving(dlm);
631                 dlm_leave_domain(dlm);
632                 dlm_complete_dlm_shutdown(dlm);
633         }
634         dlm_put(dlm);
635 }
636 EXPORT_SYMBOL_GPL(dlm_unregister_domain);
637
638 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
639                                   void **ret_data)
640 {
641         struct dlm_query_join_request *query;
642         enum dlm_query_join_response response;
643         struct dlm_ctxt *dlm = NULL;
644
645         query = (struct dlm_query_join_request *) msg->buf;
646
647         mlog(0, "node %u wants to join domain %s\n", query->node_idx,
648                   query->domain);
649
650         /*
651          * If heartbeat doesn't consider the node live, tell it
652          * to back off and try again.  This gives heartbeat a chance
653          * to catch up.
654          */
655         if (!o2hb_check_node_heartbeating(query->node_idx)) {
656                 mlog(0, "node %u is not in our live map yet\n",
657                      query->node_idx);
658
659                 response = JOIN_DISALLOW;
660                 goto respond;
661         }
662
663         response = JOIN_OK_NO_MAP;
664
665         spin_lock(&dlm_domain_lock);
666         dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
667         /* Once the dlm ctxt is marked as leaving then we don't want
668          * to be put in someone's domain map. 
669          * Also, explicitly disallow joining at certain troublesome
670          * times (ie. during recovery). */
671         if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
672                 int bit = query->node_idx;
673                 spin_lock(&dlm->spinlock);
674
675                 if (dlm->dlm_state == DLM_CTXT_NEW &&
676                     dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
677                         /*If this is a brand new context and we
678                          * haven't started our join process yet, then
679                          * the other node won the race. */
680                         response = JOIN_OK_NO_MAP;
681                 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
682                         /* Disallow parallel joins. */
683                         response = JOIN_DISALLOW;
684                 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
685                         mlog(ML_NOTICE, "node %u trying to join, but recovery "
686                              "is ongoing.\n", bit);
687                         response = JOIN_DISALLOW;
688                 } else if (test_bit(bit, dlm->recovery_map)) {
689                         mlog(ML_NOTICE, "node %u trying to join, but it "
690                              "still needs recovery.\n", bit);
691                         response = JOIN_DISALLOW;
692                 } else if (test_bit(bit, dlm->domain_map)) {
693                         mlog(ML_NOTICE, "node %u trying to join, but it "
694                              "is still in the domain! needs recovery?\n",
695                              bit);
696                         response = JOIN_DISALLOW;
697                 } else {
698                         /* Alright we're fully a part of this domain
699                          * so we keep some state as to who's joining
700                          * and indicate to him that needs to be fixed
701                          * up. */
702                         response = JOIN_OK;
703                         __dlm_set_joining_node(dlm, query->node_idx);
704                 }
705
706                 spin_unlock(&dlm->spinlock);
707         }
708         spin_unlock(&dlm_domain_lock);
709
710 respond:
711         mlog(0, "We respond with %u\n", response);
712
713         return response;
714 }
715
716 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
717                                      void **ret_data)
718 {
719         struct dlm_assert_joined *assert;
720         struct dlm_ctxt *dlm = NULL;
721
722         assert = (struct dlm_assert_joined *) msg->buf;
723
724         mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
725                   assert->domain);
726
727         spin_lock(&dlm_domain_lock);
728         dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
729         /* XXX should we consider no dlm ctxt an error? */
730         if (dlm) {
731                 spin_lock(&dlm->spinlock);
732
733                 /* Alright, this node has officially joined our
734                  * domain. Set him in the map and clean up our
735                  * leftover join state. */
736                 BUG_ON(dlm->joining_node != assert->node_idx);
737                 set_bit(assert->node_idx, dlm->domain_map);
738                 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
739
740                 printk(KERN_INFO "ocfs2_dlm: Node %u joins domain %s\n",
741                        assert->node_idx, dlm->name);
742                 __dlm_print_nodes(dlm);
743
744                 /* notify anything attached to the heartbeat events */
745                 dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
746
747                 spin_unlock(&dlm->spinlock);
748         }
749         spin_unlock(&dlm_domain_lock);
750
751         return 0;
752 }
753
754 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
755                                    void **ret_data)
756 {
757         struct dlm_cancel_join *cancel;
758         struct dlm_ctxt *dlm = NULL;
759
760         cancel = (struct dlm_cancel_join *) msg->buf;
761
762         mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
763                   cancel->domain);
764
765         spin_lock(&dlm_domain_lock);
766         dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
767
768         if (dlm) {
769                 spin_lock(&dlm->spinlock);
770
771                 /* Yikes, this guy wants to cancel his join. No
772                  * problem, we simply cleanup our join state. */
773                 BUG_ON(dlm->joining_node != cancel->node_idx);
774                 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
775
776                 spin_unlock(&dlm->spinlock);
777         }
778         spin_unlock(&dlm_domain_lock);
779
780         return 0;
781 }
782
783 static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
784                                     unsigned int node)
785 {
786         int status;
787         struct dlm_cancel_join cancel_msg;
788
789         memset(&cancel_msg, 0, sizeof(cancel_msg));
790         cancel_msg.node_idx = dlm->node_num;
791         cancel_msg.name_len = strlen(dlm->name);
792         memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
793
794         status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
795                                     &cancel_msg, sizeof(cancel_msg), node,
796                                     NULL);
797         if (status < 0) {
798                 mlog_errno(status);
799                 goto bail;
800         }
801
802 bail:
803         return status;
804 }
805
806 /* map_size should be in bytes. */
807 static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
808                                  unsigned long *node_map,
809                                  unsigned int map_size)
810 {
811         int status, tmpstat;
812         unsigned int node;
813
814         if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
815                          sizeof(unsigned long))) {
816                 mlog(ML_ERROR,
817                      "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
818                      map_size, BITS_TO_LONGS(O2NM_MAX_NODES));
819                 return -EINVAL;
820         }
821
822         status = 0;
823         node = -1;
824         while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
825                                      node + 1)) < O2NM_MAX_NODES) {
826                 if (node == dlm->node_num)
827                         continue;
828
829                 tmpstat = dlm_send_one_join_cancel(dlm, node);
830                 if (tmpstat) {
831                         mlog(ML_ERROR, "Error return %d cancelling join on "
832                              "node %d\n", tmpstat, node);
833                         if (!status)
834                                 status = tmpstat;
835                 }
836         }
837
838         if (status)
839                 mlog_errno(status);
840         return status;
841 }
842
843 static int dlm_request_join(struct dlm_ctxt *dlm,
844                             int node,
845                             enum dlm_query_join_response *response)
846 {
847         int status, retval;
848         struct dlm_query_join_request join_msg;
849
850         mlog(0, "querying node %d\n", node);
851
852         memset(&join_msg, 0, sizeof(join_msg));
853         join_msg.node_idx = dlm->node_num;
854         join_msg.name_len = strlen(dlm->name);
855         memcpy(join_msg.domain, dlm->name, join_msg.name_len);
856
857         status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
858                                     sizeof(join_msg), node, &retval);
859         if (status < 0 && status != -ENOPROTOOPT) {
860                 mlog_errno(status);
861                 goto bail;
862         }
863
864         /* -ENOPROTOOPT from the net code means the other side isn't
865             listening for our message type -- that's fine, it means
866             his dlm isn't up, so we can consider him a 'yes' but not
867             joined into the domain.  */
868         if (status == -ENOPROTOOPT) {
869                 status = 0;
870                 *response = JOIN_OK_NO_MAP;
871         } else if (retval == JOIN_DISALLOW ||
872                    retval == JOIN_OK ||
873                    retval == JOIN_OK_NO_MAP) {
874                 *response = retval;
875         } else {
876                 status = -EINVAL;
877                 mlog(ML_ERROR, "invalid response %d from node %u\n", retval,
878                      node);
879         }
880
881         mlog(0, "status %d, node %d response is %d\n", status, node,
882                   *response);
883
884 bail:
885         return status;
886 }
887
888 static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
889                                     unsigned int node)
890 {
891         int status;
892         struct dlm_assert_joined assert_msg;
893
894         mlog(0, "Sending join assert to node %u\n", node);
895
896         memset(&assert_msg, 0, sizeof(assert_msg));
897         assert_msg.node_idx = dlm->node_num;
898         assert_msg.name_len = strlen(dlm->name);
899         memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
900
901         status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
902                                     &assert_msg, sizeof(assert_msg), node,
903                                     NULL);
904         if (status < 0)
905                 mlog_errno(status);
906
907         return status;
908 }
909
910 static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
911                                   unsigned long *node_map)
912 {
913         int status, node, live;
914
915         status = 0;
916         node = -1;
917         while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
918                                      node + 1)) < O2NM_MAX_NODES) {
919                 if (node == dlm->node_num)
920                         continue;
921
922                 do {
923                         /* It is very important that this message be
924                          * received so we spin until either the node
925                          * has died or it gets the message. */
926                         status = dlm_send_one_join_assert(dlm, node);
927
928                         spin_lock(&dlm->spinlock);
929                         live = test_bit(node, dlm->live_nodes_map);
930                         spin_unlock(&dlm->spinlock);
931
932                         if (status) {
933                                 mlog(ML_ERROR, "Error return %d asserting "
934                                      "join on node %d\n", status, node);
935
936                                 /* give us some time between errors... */
937                                 if (live)
938                                         msleep(DLM_DOMAIN_BACKOFF_MS);
939                         }
940                 } while (status && live);
941         }
942 }
943
944 struct domain_join_ctxt {
945         unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
946         unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
947 };
948
949 static int dlm_should_restart_join(struct dlm_ctxt *dlm,
950                                    struct domain_join_ctxt *ctxt,
951                                    enum dlm_query_join_response response)
952 {
953         int ret;
954
955         if (response == JOIN_DISALLOW) {
956                 mlog(0, "Latest response of disallow -- should restart\n");
957                 return 1;
958         }
959
960         spin_lock(&dlm->spinlock);
961         /* For now, we restart the process if the node maps have
962          * changed at all */
963         ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
964                      sizeof(dlm->live_nodes_map));
965         spin_unlock(&dlm->spinlock);
966
967         if (ret)
968                 mlog(0, "Node maps changed -- should restart\n");
969
970         return ret;
971 }
972
973 static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
974 {
975         int status = 0, tmpstat, node;
976         struct domain_join_ctxt *ctxt;
977         enum dlm_query_join_response response;
978
979         mlog_entry("%p", dlm);
980
981         ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL);
982         if (!ctxt) {
983                 status = -ENOMEM;
984                 mlog_errno(status);
985                 goto bail;
986         }
987
988         /* group sem locking should work for us here -- we're already
989          * registered for heartbeat events so filling this should be
990          * atomic wrt getting those handlers called. */
991         o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
992
993         spin_lock(&dlm->spinlock);
994         memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
995
996         __dlm_set_joining_node(dlm, dlm->node_num);
997
998         spin_unlock(&dlm->spinlock);
999
1000         node = -1;
1001         while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
1002                                      node + 1)) < O2NM_MAX_NODES) {
1003                 if (node == dlm->node_num)
1004                         continue;
1005
1006                 status = dlm_request_join(dlm, node, &response);
1007                 if (status < 0) {
1008                         mlog_errno(status);
1009                         goto bail;
1010                 }
1011
1012                 /* Ok, either we got a response or the node doesn't have a
1013                  * dlm up. */
1014                 if (response == JOIN_OK)
1015                         set_bit(node, ctxt->yes_resp_map);
1016
1017                 if (dlm_should_restart_join(dlm, ctxt, response)) {
1018                         status = -EAGAIN;
1019                         goto bail;
1020                 }
1021         }
1022
1023         mlog(0, "Yay, done querying nodes!\n");
1024
1025         /* Yay, everyone agree's we can join the domain. My domain is
1026          * comprised of all nodes who were put in the
1027          * yes_resp_map. Copy that into our domain map and send a join
1028          * assert message to clean up everyone elses state. */
1029         spin_lock(&dlm->spinlock);
1030         memcpy(dlm->domain_map, ctxt->yes_resp_map,
1031                sizeof(ctxt->yes_resp_map));
1032         set_bit(dlm->node_num, dlm->domain_map);
1033         spin_unlock(&dlm->spinlock);
1034
1035         dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
1036
1037         /* Joined state *must* be set before the joining node
1038          * information, otherwise the query_join handler may read no
1039          * current joiner but a state of NEW and tell joining nodes
1040          * we're not in the domain. */
1041         spin_lock(&dlm_domain_lock);
1042         dlm->dlm_state = DLM_CTXT_JOINED;
1043         dlm->num_joins++;
1044         spin_unlock(&dlm_domain_lock);
1045
1046 bail:
1047         spin_lock(&dlm->spinlock);
1048         __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1049         if (!status)
1050                 __dlm_print_nodes(dlm);
1051         spin_unlock(&dlm->spinlock);
1052
1053         if (ctxt) {
1054                 /* Do we need to send a cancel message to any nodes? */
1055                 if (status < 0) {
1056                         tmpstat = dlm_send_join_cancels(dlm,
1057                                                         ctxt->yes_resp_map,
1058                                                         sizeof(ctxt->yes_resp_map));
1059                         if (tmpstat < 0)
1060                                 mlog_errno(tmpstat);
1061                 }
1062                 kfree(ctxt);
1063         }
1064
1065         mlog(0, "returning %d\n", status);
1066         return status;
1067 }
1068
1069 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
1070 {
1071         o2hb_unregister_callback(&dlm->dlm_hb_up);
1072         o2hb_unregister_callback(&dlm->dlm_hb_down);
1073         o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
1074 }
1075
1076 static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1077 {
1078         int status;
1079
1080         mlog(0, "registering handlers.\n");
1081
1082         o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
1083                             dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
1084         status = o2hb_register_callback(&dlm->dlm_hb_down);
1085         if (status)
1086                 goto bail;
1087
1088         o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
1089                             dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
1090         status = o2hb_register_callback(&dlm->dlm_hb_up);
1091         if (status)
1092                 goto bail;
1093
1094         status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1095                                         sizeof(struct dlm_master_request),
1096                                         dlm_master_request_handler,
1097                                         dlm, NULL, &dlm->dlm_domain_handlers);
1098         if (status)
1099                 goto bail;
1100
1101         status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1102                                         sizeof(struct dlm_assert_master),
1103                                         dlm_assert_master_handler,
1104                                         dlm, dlm_assert_master_post_handler,
1105                                         &dlm->dlm_domain_handlers);
1106         if (status)
1107                 goto bail;
1108
1109         status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1110                                         sizeof(struct dlm_create_lock),
1111                                         dlm_create_lock_handler,
1112                                         dlm, NULL, &dlm->dlm_domain_handlers);
1113         if (status)
1114                 goto bail;
1115
1116         status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1117                                         DLM_CONVERT_LOCK_MAX_LEN,
1118                                         dlm_convert_lock_handler,
1119                                         dlm, NULL, &dlm->dlm_domain_handlers);
1120         if (status)
1121                 goto bail;
1122
1123         status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1124                                         DLM_UNLOCK_LOCK_MAX_LEN,
1125                                         dlm_unlock_lock_handler,
1126                                         dlm, NULL, &dlm->dlm_domain_handlers);
1127         if (status)
1128                 goto bail;
1129
1130         status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1131                                         DLM_PROXY_AST_MAX_LEN,
1132                                         dlm_proxy_ast_handler,
1133                                         dlm, NULL, &dlm->dlm_domain_handlers);
1134         if (status)
1135                 goto bail;
1136
1137         status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1138                                         sizeof(struct dlm_exit_domain),
1139                                         dlm_exit_domain_handler,
1140                                         dlm, NULL, &dlm->dlm_domain_handlers);
1141         if (status)
1142                 goto bail;
1143
1144         status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
1145                                         sizeof(struct dlm_deref_lockres),
1146                                         dlm_deref_lockres_handler,
1147                                         dlm, NULL, &dlm->dlm_domain_handlers);
1148         if (status)
1149                 goto bail;
1150
1151         status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1152                                         sizeof(struct dlm_migrate_request),
1153                                         dlm_migrate_request_handler,
1154                                         dlm, NULL, &dlm->dlm_domain_handlers);
1155         if (status)
1156                 goto bail;
1157
1158         status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1159                                         DLM_MIG_LOCKRES_MAX_LEN,
1160                                         dlm_mig_lockres_handler,
1161                                         dlm, NULL, &dlm->dlm_domain_handlers);
1162         if (status)
1163                 goto bail;
1164
1165         status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1166                                         sizeof(struct dlm_master_requery),
1167                                         dlm_master_requery_handler,
1168                                         dlm, NULL, &dlm->dlm_domain_handlers);
1169         if (status)
1170                 goto bail;
1171
1172         status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1173                                         sizeof(struct dlm_lock_request),
1174                                         dlm_request_all_locks_handler,
1175                                         dlm, NULL, &dlm->dlm_domain_handlers);
1176         if (status)
1177                 goto bail;
1178
1179         status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1180                                         sizeof(struct dlm_reco_data_done),
1181                                         dlm_reco_data_done_handler,
1182                                         dlm, NULL, &dlm->dlm_domain_handlers);
1183         if (status)
1184                 goto bail;
1185
1186         status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1187                                         sizeof(struct dlm_begin_reco),
1188                                         dlm_begin_reco_handler,
1189                                         dlm, NULL, &dlm->dlm_domain_handlers);
1190         if (status)
1191                 goto bail;
1192
1193         status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1194                                         sizeof(struct dlm_finalize_reco),
1195                                         dlm_finalize_reco_handler,
1196                                         dlm, NULL, &dlm->dlm_domain_handlers);
1197         if (status)
1198                 goto bail;
1199
1200 bail:
1201         if (status)
1202                 dlm_unregister_domain_handlers(dlm);
1203
1204         return status;
1205 }
1206
1207 static int dlm_join_domain(struct dlm_ctxt *dlm)
1208 {
1209         int status;
1210
1211         BUG_ON(!dlm);
1212
1213         mlog(0, "Join domain %s\n", dlm->name);
1214
1215         status = dlm_register_domain_handlers(dlm);
1216         if (status) {
1217                 mlog_errno(status);
1218                 goto bail;
1219         }
1220
1221         status = dlm_launch_thread(dlm);
1222         if (status < 0) {
1223                 mlog_errno(status);
1224                 goto bail;
1225         }
1226
1227         status = dlm_launch_recovery_thread(dlm);
1228         if (status < 0) {
1229                 mlog_errno(status);
1230                 goto bail;
1231         }
1232
1233         dlm->dlm_worker = create_singlethread_workqueue("dlm_wq");
1234         if (!dlm->dlm_worker) {
1235                 status = -ENOMEM;
1236                 mlog_errno(status);
1237                 goto bail;
1238         }
1239
1240         do {
1241                 unsigned int backoff;
1242                 status = dlm_try_to_join_domain(dlm);
1243
1244                 /* If we're racing another node to the join, then we
1245                  * need to back off temporarily and let them
1246                  * complete. */
1247                 if (status == -EAGAIN) {
1248                         if (signal_pending(current)) {
1249                                 status = -ERESTARTSYS;
1250                                 goto bail;
1251                         }
1252
1253                         /*
1254                          * <chip> After you!
1255                          * <dale> No, after you!
1256                          * <chip> I insist!
1257                          * <dale> But you first!
1258                          * ...
1259                          */
1260                         backoff = (unsigned int)(jiffies & 0x3);
1261                         backoff *= DLM_DOMAIN_BACKOFF_MS;
1262                         mlog(0, "backoff %d\n", backoff);
1263                         msleep(backoff);
1264                 }
1265         } while (status == -EAGAIN);
1266
1267         if (status < 0) {
1268                 mlog_errno(status);
1269                 goto bail;
1270         }
1271
1272         status = 0;
1273 bail:
1274         wake_up(&dlm_domain_events);
1275
1276         if (status) {
1277                 dlm_unregister_domain_handlers(dlm);
1278                 dlm_complete_thread(dlm);
1279                 dlm_complete_recovery_thread(dlm);
1280                 dlm_destroy_dlm_worker(dlm);
1281         }
1282
1283         return status;
1284 }
1285
1286 static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1287                                 u32 key)
1288 {
1289         int i;
1290         struct dlm_ctxt *dlm = NULL;
1291
1292         dlm = kzalloc(sizeof(*dlm), GFP_KERNEL);
1293         if (!dlm) {
1294                 mlog_errno(-ENOMEM);
1295                 goto leave;
1296         }
1297
1298         dlm->name = kmalloc(strlen(domain) + 1, GFP_KERNEL);
1299         if (dlm->name == NULL) {
1300                 mlog_errno(-ENOMEM);
1301                 kfree(dlm);
1302                 dlm = NULL;
1303                 goto leave;
1304         }
1305
1306         dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
1307         if (!dlm->lockres_hash) {
1308                 mlog_errno(-ENOMEM);
1309                 kfree(dlm->name);
1310                 kfree(dlm);
1311                 dlm = NULL;
1312                 goto leave;
1313         }
1314
1315         for (i = 0; i < DLM_HASH_BUCKETS; i++)
1316                 INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
1317
1318         strcpy(dlm->name, domain);
1319         dlm->key = key;
1320         dlm->node_num = o2nm_this_node();
1321
1322         spin_lock_init(&dlm->spinlock);
1323         spin_lock_init(&dlm->master_lock);
1324         spin_lock_init(&dlm->ast_lock);
1325         INIT_LIST_HEAD(&dlm->list);
1326         INIT_LIST_HEAD(&dlm->dirty_list);
1327         INIT_LIST_HEAD(&dlm->reco.resources);
1328         INIT_LIST_HEAD(&dlm->reco.received);
1329         INIT_LIST_HEAD(&dlm->reco.node_data);
1330         INIT_LIST_HEAD(&dlm->purge_list);
1331         INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
1332         dlm->reco.state = 0;
1333
1334         INIT_LIST_HEAD(&dlm->pending_asts);
1335         INIT_LIST_HEAD(&dlm->pending_basts);
1336
1337         mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
1338                   dlm->recovery_map, &(dlm->recovery_map[0]));
1339
1340         memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
1341         memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
1342         memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
1343
1344         dlm->dlm_thread_task = NULL;
1345         dlm->dlm_reco_thread_task = NULL;
1346         dlm->dlm_worker = NULL;
1347         init_waitqueue_head(&dlm->dlm_thread_wq);
1348         init_waitqueue_head(&dlm->dlm_reco_thread_wq);
1349         init_waitqueue_head(&dlm->reco.event);
1350         init_waitqueue_head(&dlm->ast_wq);
1351         init_waitqueue_head(&dlm->migration_wq);
1352         INIT_LIST_HEAD(&dlm->master_list);
1353         INIT_LIST_HEAD(&dlm->mle_hb_events);
1354
1355         dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
1356         init_waitqueue_head(&dlm->dlm_join_events);
1357
1358         dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
1359         dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
1360         atomic_set(&dlm->local_resources, 0);
1361         atomic_set(&dlm->remote_resources, 0);
1362         atomic_set(&dlm->unknown_resources, 0);
1363
1364         spin_lock_init(&dlm->work_lock);
1365         INIT_LIST_HEAD(&dlm->work_list);
1366         INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work);
1367
1368         kref_init(&dlm->dlm_refs);
1369         dlm->dlm_state = DLM_CTXT_NEW;
1370
1371         INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
1372
1373         mlog(0, "context init: refcount %u\n",
1374                   atomic_read(&dlm->dlm_refs.refcount));
1375
1376 leave:
1377         return dlm;
1378 }
1379
1380 /*
1381  * dlm_register_domain: one-time setup per "domain"
1382  */
1383 struct dlm_ctxt * dlm_register_domain(const char *domain,
1384                                u32 key)
1385 {
1386         int ret;
1387         struct dlm_ctxt *dlm = NULL;
1388         struct dlm_ctxt *new_ctxt = NULL;
1389
1390         if (strlen(domain) > O2NM_MAX_NAME_LEN) {
1391                 ret = -ENAMETOOLONG;
1392                 mlog(ML_ERROR, "domain name length too long\n");
1393                 goto leave;
1394         }
1395
1396         if (!o2hb_check_local_node_heartbeating()) {
1397                 mlog(ML_ERROR, "the local node has not been configured, or is "
1398                      "not heartbeating\n");
1399                 ret = -EPROTO;
1400                 goto leave;
1401         }
1402
1403         mlog(0, "register called for domain \"%s\"\n", domain);
1404
1405 retry:
1406         dlm = NULL;
1407         if (signal_pending(current)) {
1408                 ret = -ERESTARTSYS;
1409                 mlog_errno(ret);
1410                 goto leave;
1411         }
1412
1413         spin_lock(&dlm_domain_lock);
1414
1415         dlm = __dlm_lookup_domain(domain);
1416         if (dlm) {
1417                 if (dlm->dlm_state != DLM_CTXT_JOINED) {
1418                         spin_unlock(&dlm_domain_lock);
1419
1420                         mlog(0, "This ctxt is not joined yet!\n");
1421                         wait_event_interruptible(dlm_domain_events,
1422                                                  dlm_wait_on_domain_helper(
1423                                                          domain));
1424                         goto retry;
1425                 }
1426
1427                 __dlm_get(dlm);
1428                 dlm->num_joins++;
1429
1430                 spin_unlock(&dlm_domain_lock);
1431
1432                 ret = 0;
1433                 goto leave;
1434         }
1435
1436         /* doesn't exist */
1437         if (!new_ctxt) {
1438                 spin_unlock(&dlm_domain_lock);
1439
1440                 new_ctxt = dlm_alloc_ctxt(domain, key);
1441                 if (new_ctxt)
1442                         goto retry;
1443
1444                 ret = -ENOMEM;
1445                 mlog_errno(ret);
1446                 goto leave;
1447         }
1448
1449         /* a little variable switch-a-roo here... */
1450         dlm = new_ctxt;
1451         new_ctxt = NULL;
1452
1453         /* add the new domain */
1454         list_add_tail(&dlm->list, &dlm_domains);
1455         spin_unlock(&dlm_domain_lock);
1456
1457         ret = dlm_join_domain(dlm);
1458         if (ret) {
1459                 mlog_errno(ret);
1460                 dlm_put(dlm);
1461                 goto leave;
1462         }
1463
1464         ret = 0;
1465 leave:
1466         if (new_ctxt)
1467                 dlm_free_ctxt_mem(new_ctxt);
1468
1469         if (ret < 0)
1470                 dlm = ERR_PTR(ret);
1471
1472         return dlm;
1473 }
1474 EXPORT_SYMBOL_GPL(dlm_register_domain);
1475
1476 static LIST_HEAD(dlm_join_handlers);
1477
1478 static void dlm_unregister_net_handlers(void)
1479 {
1480         o2net_unregister_handler_list(&dlm_join_handlers);
1481 }
1482
1483 static int dlm_register_net_handlers(void)
1484 {
1485         int status = 0;
1486
1487         status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1488                                         sizeof(struct dlm_query_join_request),
1489                                         dlm_query_join_handler,
1490                                         NULL, NULL, &dlm_join_handlers);
1491         if (status)
1492                 goto bail;
1493
1494         status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1495                                         sizeof(struct dlm_assert_joined),
1496                                         dlm_assert_joined_handler,
1497                                         NULL, NULL, &dlm_join_handlers);
1498         if (status)
1499                 goto bail;
1500
1501         status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1502                                         sizeof(struct dlm_cancel_join),
1503                                         dlm_cancel_join_handler,
1504                                         NULL, NULL, &dlm_join_handlers);
1505
1506 bail:
1507         if (status < 0)
1508                 dlm_unregister_net_handlers();
1509
1510         return status;
1511 }
1512
1513 /* Domain eviction callback handling.
1514  *
1515  * The file system requires notification of node death *before* the
1516  * dlm completes it's recovery work, otherwise it may be able to
1517  * acquire locks on resources requiring recovery. Since the dlm can
1518  * evict a node from it's domain *before* heartbeat fires, a similar
1519  * mechanism is required. */
1520
1521 /* Eviction is not expected to happen often, so a per-domain lock is
1522  * not necessary. Eviction callbacks are allowed to sleep for short
1523  * periods of time. */
1524 static DECLARE_RWSEM(dlm_callback_sem);
1525
1526 void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
1527                                         int node_num)
1528 {
1529         struct list_head *iter;
1530         struct dlm_eviction_cb *cb;
1531
1532         down_read(&dlm_callback_sem);
1533         list_for_each(iter, &dlm->dlm_eviction_callbacks) {
1534                 cb = list_entry(iter, struct dlm_eviction_cb, ec_item);
1535
1536                 cb->ec_func(node_num, cb->ec_data);
1537         }
1538         up_read(&dlm_callback_sem);
1539 }
1540
1541 void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
1542                            dlm_eviction_func *f,
1543                            void *data)
1544 {
1545         INIT_LIST_HEAD(&cb->ec_item);
1546         cb->ec_func = f;
1547         cb->ec_data = data;
1548 }
1549 EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
1550
1551 void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
1552                               struct dlm_eviction_cb *cb)
1553 {
1554         down_write(&dlm_callback_sem);
1555         list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
1556         up_write(&dlm_callback_sem);
1557 }
1558 EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
1559
1560 void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
1561 {
1562         down_write(&dlm_callback_sem);
1563         list_del_init(&cb->ec_item);
1564         up_write(&dlm_callback_sem);
1565 }
1566 EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
1567
1568 static int __init dlm_init(void)
1569 {
1570         int status;
1571
1572         dlm_print_version();
1573
1574         status = dlm_init_mle_cache();
1575         if (status)
1576                 return -1;
1577
1578         status = dlm_register_net_handlers();
1579         if (status) {
1580                 dlm_destroy_mle_cache();
1581                 return -1;
1582         }
1583
1584         return 0;
1585 }
1586
1587 static void __exit dlm_exit (void)
1588 {
1589         dlm_unregister_net_handlers();
1590         dlm_destroy_mle_cache();
1591 }
1592
1593 MODULE_AUTHOR("Oracle");
1594 MODULE_LICENSE("GPL");
1595
1596 module_init(dlm_init);
1597 module_exit(dlm_exit);