[PATCH] Clean up ocfs2 hash probe and make it faster
[powerpc.git] / fs / ocfs2 / dlm / dlmdomain.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmdomain.c
5  *
6  * defines domain join / leave apis
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26
27 #include <linux/module.h>
28 #include <linux/types.h>
29 #include <linux/slab.h>
30 #include <linux/highmem.h>
31 #include <linux/utsname.h>
32 #include <linux/init.h>
33 #include <linux/spinlock.h>
34 #include <linux/delay.h>
35 #include <linux/err.h>
36
37 #include "cluster/heartbeat.h"
38 #include "cluster/nodemanager.h"
39 #include "cluster/tcp.h"
40
41 #include "dlmapi.h"
42 #include "dlmcommon.h"
43
44 #include "dlmdebug.h"
45 #include "dlmdomain.h"
46
47 #include "dlmver.h"
48
49 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
50 #include "cluster/masklog.h"
51
52 /*
53  *
54  * spinlock lock ordering: if multiple locks are needed, obey this ordering:
55  *    dlm_domain_lock
56  *    struct dlm_ctxt->spinlock
57  *    struct dlm_lock_resource->spinlock
58  *    struct dlm_ctxt->master_lock
59  *    struct dlm_ctxt->ast_lock
60  *    dlm_master_list_entry->spinlock
61  *    dlm_lock->spinlock
62  *
63  */
64
65 spinlock_t dlm_domain_lock = SPIN_LOCK_UNLOCKED;
66 LIST_HEAD(dlm_domains);
67 static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
68
69 #define DLM_DOMAIN_BACKOFF_MS 200
70
71 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data);
72 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data);
73 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data);
74 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data);
75
76 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
77
78 void __dlm_unhash_lockres(struct dlm_lock_resource *lockres)
79 {
80         hlist_del_init(&lockres->hash_node);
81         dlm_lockres_put(lockres);
82 }
83
84 void __dlm_insert_lockres(struct dlm_ctxt *dlm,
85                        struct dlm_lock_resource *res)
86 {
87         struct hlist_head *bucket;
88         struct qstr *q;
89
90         assert_spin_locked(&dlm->spinlock);
91
92         q = &res->lockname;
93         bucket = &(dlm->lockres_hash[q->hash % DLM_HASH_BUCKETS]);
94
95         /* get a reference for our hashtable */
96         dlm_lockres_get(res);
97
98         hlist_add_head(&res->hash_node, bucket);
99 }
100
101 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
102                                                 const char *name,
103                                                 unsigned int len,
104                                                 unsigned int hash)
105 {
106         struct hlist_head *bucket;
107         struct hlist_node *list;
108
109         mlog_entry("%.*s\n", len, name);
110
111         assert_spin_locked(&dlm->spinlock);
112
113         bucket = dlm->lockres_hash + full_name_hash(name, len) % DLM_HASH_BUCKETS;
114         hlist_for_each(list, bucket) {
115                 struct dlm_lock_resource *res = hlist_entry(list,
116                         struct dlm_lock_resource, hash_node);
117                 if (res->lockname.name[0] != name[0])
118                         continue;
119                 if (unlikely(res->lockname.len != len))
120                         continue;
121                 if (memcmp(res->lockname.name + 1, name + 1, len - 1))
122                         continue;
123                 dlm_lockres_get(res);
124                 return res;
125         }
126         return NULL;
127 }
128
129 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
130                                     const char *name,
131                                     unsigned int len)
132 {
133         struct dlm_lock_resource *res;
134         unsigned int hash = dlm_lockid_hash(name, len);
135
136         spin_lock(&dlm->spinlock);
137         res = __dlm_lookup_lockres(dlm, name, len, hash);
138         spin_unlock(&dlm->spinlock);
139         return res;
140 }
141
142 static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
143 {
144         struct dlm_ctxt *tmp = NULL;
145         struct list_head *iter;
146
147         assert_spin_locked(&dlm_domain_lock);
148
149         /* tmp->name here is always NULL terminated,
150          * but domain may not be! */
151         list_for_each(iter, &dlm_domains) {
152                 tmp = list_entry (iter, struct dlm_ctxt, list);
153                 if (strlen(tmp->name) == len &&
154                     memcmp(tmp->name, domain, len)==0)
155                         break;
156                 tmp = NULL;
157         }
158
159         return tmp;
160 }
161
162 /* For null terminated domain strings ONLY */
163 static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
164 {
165         assert_spin_locked(&dlm_domain_lock);
166
167         return __dlm_lookup_domain_full(domain, strlen(domain));
168 }
169
170
171 /* returns true on one of two conditions:
172  * 1) the domain does not exist
173  * 2) the domain exists and it's state is "joined" */
174 static int dlm_wait_on_domain_helper(const char *domain)
175 {
176         int ret = 0;
177         struct dlm_ctxt *tmp = NULL;
178
179         spin_lock(&dlm_domain_lock);
180
181         tmp = __dlm_lookup_domain(domain);
182         if (!tmp)
183                 ret = 1;
184         else if (tmp->dlm_state == DLM_CTXT_JOINED)
185                 ret = 1;
186
187         spin_unlock(&dlm_domain_lock);
188         return ret;
189 }
190
191 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
192 {
193         if (dlm->lockres_hash)
194                 free_page((unsigned long) dlm->lockres_hash);
195
196         if (dlm->name)
197                 kfree(dlm->name);
198
199         kfree(dlm);
200 }
201
202 /* A little strange - this function will be called while holding
203  * dlm_domain_lock and is expected to be holding it on the way out. We
204  * will however drop and reacquire it multiple times */
205 static void dlm_ctxt_release(struct kref *kref)
206 {
207         struct dlm_ctxt *dlm;
208
209         dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
210
211         BUG_ON(dlm->num_joins);
212         BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
213
214         /* we may still be in the list if we hit an error during join. */
215         list_del_init(&dlm->list);
216
217         spin_unlock(&dlm_domain_lock);
218
219         mlog(0, "freeing memory from domain %s\n", dlm->name);
220
221         wake_up(&dlm_domain_events);
222
223         dlm_free_ctxt_mem(dlm);
224
225         spin_lock(&dlm_domain_lock);
226 }
227
228 void dlm_put(struct dlm_ctxt *dlm)
229 {
230         spin_lock(&dlm_domain_lock);
231         kref_put(&dlm->dlm_refs, dlm_ctxt_release);
232         spin_unlock(&dlm_domain_lock);
233 }
234
235 static void __dlm_get(struct dlm_ctxt *dlm)
236 {
237         kref_get(&dlm->dlm_refs);
238 }
239
240 /* given a questionable reference to a dlm object, gets a reference if
241  * it can find it in the list, otherwise returns NULL in which case
242  * you shouldn't trust your pointer. */
243 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
244 {
245         struct list_head *iter;
246         struct dlm_ctxt *target = NULL;
247
248         spin_lock(&dlm_domain_lock);
249
250         list_for_each(iter, &dlm_domains) {
251                 target = list_entry (iter, struct dlm_ctxt, list);
252
253                 if (target == dlm) {
254                         __dlm_get(target);
255                         break;
256                 }
257
258                 target = NULL;
259         }
260
261         spin_unlock(&dlm_domain_lock);
262
263         return target;
264 }
265
266 int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
267 {
268         int ret;
269
270         spin_lock(&dlm_domain_lock);
271         ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
272                 (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
273         spin_unlock(&dlm_domain_lock);
274
275         return ret;
276 }
277
278 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
279 {
280         dlm_unregister_domain_handlers(dlm);
281         dlm_complete_thread(dlm);
282         dlm_complete_recovery_thread(dlm);
283
284         /* We've left the domain. Now we can take ourselves out of the
285          * list and allow the kref stuff to help us free the
286          * memory. */
287         spin_lock(&dlm_domain_lock);
288         list_del_init(&dlm->list);
289         spin_unlock(&dlm_domain_lock);
290
291         /* Wake up anyone waiting for us to remove this domain */
292         wake_up(&dlm_domain_events);
293 }
294
295 static void dlm_migrate_all_locks(struct dlm_ctxt *dlm)
296 {
297         int i;
298         struct dlm_lock_resource *res;
299
300         mlog(0, "Migrating locks from domain %s\n", dlm->name);
301 restart:
302         spin_lock(&dlm->spinlock);
303         for (i = 0; i < DLM_HASH_BUCKETS; i++) {
304                 while (!hlist_empty(&dlm->lockres_hash[i])) {
305                         res = hlist_entry(dlm->lockres_hash[i].first,
306                                           struct dlm_lock_resource, hash_node);
307                         /* need reference when manually grabbing lockres */
308                         dlm_lockres_get(res);
309                         /* this should unhash the lockres
310                          * and exit with dlm->spinlock */
311                         mlog(0, "purging res=%p\n", res);
312                         if (dlm_lockres_is_dirty(dlm, res)) {
313                                 /* HACK!  this should absolutely go.
314                                  * need to figure out why some empty
315                                  * lockreses are still marked dirty */
316                                 mlog(ML_ERROR, "lockres %.*s dirty!\n",
317                                      res->lockname.len, res->lockname.name);
318
319                                 spin_unlock(&dlm->spinlock);
320                                 dlm_kick_thread(dlm, res);
321                                 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
322                                 dlm_lockres_put(res);
323                                 goto restart;
324                         }
325                         dlm_purge_lockres(dlm, res);
326                         dlm_lockres_put(res);
327                 }
328         }
329         spin_unlock(&dlm->spinlock);
330
331         mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
332 }
333
334 static int dlm_no_joining_node(struct dlm_ctxt *dlm)
335 {
336         int ret;
337
338         spin_lock(&dlm->spinlock);
339         ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
340         spin_unlock(&dlm->spinlock);
341
342         return ret;
343 }
344
345 static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
346 {
347         /* Yikes, a double spinlock! I need domain_lock for the dlm
348          * state and the dlm spinlock for join state... Sorry! */
349 again:
350         spin_lock(&dlm_domain_lock);
351         spin_lock(&dlm->spinlock);
352
353         if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
354                 mlog(0, "Node %d is joining, we wait on it.\n",
355                           dlm->joining_node);
356                 spin_unlock(&dlm->spinlock);
357                 spin_unlock(&dlm_domain_lock);
358
359                 wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
360                 goto again;
361         }
362
363         dlm->dlm_state = DLM_CTXT_LEAVING;
364         spin_unlock(&dlm->spinlock);
365         spin_unlock(&dlm_domain_lock);
366 }
367
368 static void __dlm_print_nodes(struct dlm_ctxt *dlm)
369 {
370         int node = -1;
371
372         assert_spin_locked(&dlm->spinlock);
373
374         mlog(ML_NOTICE, "Nodes in my domain (\"%s\"):\n", dlm->name);
375
376         while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
377                                      node + 1)) < O2NM_MAX_NODES) {
378                 mlog(ML_NOTICE, " node %d\n", node);
379         }
380 }
381
382 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data)
383 {
384         struct dlm_ctxt *dlm = data;
385         unsigned int node;
386         struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
387
388         mlog_entry("%p %u %p", msg, len, data);
389
390         if (!dlm_grab(dlm))
391                 return 0;
392
393         node = exit_msg->node_idx;
394
395         mlog(0, "Node %u leaves domain %s\n", node, dlm->name);
396
397         spin_lock(&dlm->spinlock);
398         clear_bit(node, dlm->domain_map);
399         __dlm_print_nodes(dlm);
400
401         /* notify anything attached to the heartbeat events */
402         dlm_hb_event_notify_attached(dlm, node, 0);
403
404         spin_unlock(&dlm->spinlock);
405
406         dlm_put(dlm);
407
408         return 0;
409 }
410
411 static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm,
412                                     unsigned int node)
413 {
414         int status;
415         struct dlm_exit_domain leave_msg;
416
417         mlog(0, "Asking node %u if we can leave the domain %s me = %u\n",
418                   node, dlm->name, dlm->node_num);
419
420         memset(&leave_msg, 0, sizeof(leave_msg));
421         leave_msg.node_idx = dlm->node_num;
422
423         status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key,
424                                     &leave_msg, sizeof(leave_msg), node,
425                                     NULL);
426
427         mlog(0, "status return %d from o2net_send_message\n", status);
428
429         return status;
430 }
431
432
433 static void dlm_leave_domain(struct dlm_ctxt *dlm)
434 {
435         int node, clear_node, status;
436
437         /* At this point we've migrated away all our locks and won't
438          * accept mastership of new ones. The dlm is responsible for
439          * almost nothing now. We make sure not to confuse any joining
440          * nodes and then commence shutdown procedure. */
441
442         spin_lock(&dlm->spinlock);
443         /* Clear ourselves from the domain map */
444         clear_bit(dlm->node_num, dlm->domain_map);
445         while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
446                                      0)) < O2NM_MAX_NODES) {
447                 /* Drop the dlm spinlock. This is safe wrt the domain_map.
448                  * -nodes cannot be added now as the
449                  *   query_join_handlers knows to respond with OK_NO_MAP
450                  * -we catch the right network errors if a node is
451                  *   removed from the map while we're sending him the
452                  *   exit message. */
453                 spin_unlock(&dlm->spinlock);
454
455                 clear_node = 1;
456
457                 status = dlm_send_one_domain_exit(dlm, node);
458                 if (status < 0 &&
459                     status != -ENOPROTOOPT &&
460                     status != -ENOTCONN) {
461                         mlog(ML_NOTICE, "Error %d sending domain exit message "
462                              "to node %d\n", status, node);
463
464                         /* Not sure what to do here but lets sleep for
465                          * a bit in case this was a transient
466                          * error... */
467                         msleep(DLM_DOMAIN_BACKOFF_MS);
468                         clear_node = 0;
469                 }
470
471                 spin_lock(&dlm->spinlock);
472                 /* If we're not clearing the node bit then we intend
473                  * to loop back around to try again. */
474                 if (clear_node)
475                         clear_bit(node, dlm->domain_map);
476         }
477         spin_unlock(&dlm->spinlock);
478 }
479
480 int dlm_joined(struct dlm_ctxt *dlm)
481 {
482         int ret = 0;
483
484         spin_lock(&dlm_domain_lock);
485
486         if (dlm->dlm_state == DLM_CTXT_JOINED)
487                 ret = 1;
488
489         spin_unlock(&dlm_domain_lock);
490
491         return ret;
492 }
493
494 int dlm_shutting_down(struct dlm_ctxt *dlm)
495 {
496         int ret = 0;
497
498         spin_lock(&dlm_domain_lock);
499
500         if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN)
501                 ret = 1;
502
503         spin_unlock(&dlm_domain_lock);
504
505         return ret;
506 }
507
508 void dlm_unregister_domain(struct dlm_ctxt *dlm)
509 {
510         int leave = 0;
511
512         spin_lock(&dlm_domain_lock);
513         BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
514         BUG_ON(!dlm->num_joins);
515
516         dlm->num_joins--;
517         if (!dlm->num_joins) {
518                 /* We mark it "in shutdown" now so new register
519                  * requests wait until we've completely left the
520                  * domain. Don't use DLM_CTXT_LEAVING yet as we still
521                  * want new domain joins to communicate with us at
522                  * least until we've completed migration of our
523                  * resources. */
524                 dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
525                 leave = 1;
526         }
527         spin_unlock(&dlm_domain_lock);
528
529         if (leave) {
530                 mlog(0, "shutting down domain %s\n", dlm->name);
531
532                 /* We changed dlm state, notify the thread */
533                 dlm_kick_thread(dlm, NULL);
534
535                 dlm_migrate_all_locks(dlm);
536                 dlm_mark_domain_leaving(dlm);
537                 dlm_leave_domain(dlm);
538                 dlm_complete_dlm_shutdown(dlm);
539         }
540         dlm_put(dlm);
541 }
542 EXPORT_SYMBOL_GPL(dlm_unregister_domain);
543
544 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
545 {
546         struct dlm_query_join_request *query;
547         enum dlm_query_join_response response;
548         struct dlm_ctxt *dlm = NULL;
549
550         query = (struct dlm_query_join_request *) msg->buf;
551
552         mlog(0, "node %u wants to join domain %s\n", query->node_idx,
553                   query->domain);
554
555         /*
556          * If heartbeat doesn't consider the node live, tell it
557          * to back off and try again.  This gives heartbeat a chance
558          * to catch up.
559          */
560         if (!o2hb_check_node_heartbeating(query->node_idx)) {
561                 mlog(0, "node %u is not in our live map yet\n",
562                      query->node_idx);
563
564                 response = JOIN_DISALLOW;
565                 goto respond;
566         }
567
568         response = JOIN_OK_NO_MAP;
569
570         spin_lock(&dlm_domain_lock);
571         dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
572         /* Once the dlm ctxt is marked as leaving then we don't want
573          * to be put in someone's domain map. 
574          * Also, explicitly disallow joining at certain troublesome
575          * times (ie. during recovery). */
576         if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
577                 int bit = query->node_idx;
578                 spin_lock(&dlm->spinlock);
579
580                 if (dlm->dlm_state == DLM_CTXT_NEW &&
581                     dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
582                         /*If this is a brand new context and we
583                          * haven't started our join process yet, then
584                          * the other node won the race. */
585                         response = JOIN_OK_NO_MAP;
586                 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
587                         /* Disallow parallel joins. */
588                         response = JOIN_DISALLOW;
589                 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
590                         mlog(ML_NOTICE, "node %u trying to join, but recovery "
591                              "is ongoing.\n", bit);
592                         response = JOIN_DISALLOW;
593                 } else if (test_bit(bit, dlm->recovery_map)) {
594                         mlog(ML_NOTICE, "node %u trying to join, but it "
595                              "still needs recovery.\n", bit);
596                         response = JOIN_DISALLOW;
597                 } else if (test_bit(bit, dlm->domain_map)) {
598                         mlog(ML_NOTICE, "node %u trying to join, but it "
599                              "is still in the domain! needs recovery?\n",
600                              bit);
601                         response = JOIN_DISALLOW;
602                 } else {
603                         /* Alright we're fully a part of this domain
604                          * so we keep some state as to who's joining
605                          * and indicate to him that needs to be fixed
606                          * up. */
607                         response = JOIN_OK;
608                         __dlm_set_joining_node(dlm, query->node_idx);
609                 }
610
611                 spin_unlock(&dlm->spinlock);
612         }
613         spin_unlock(&dlm_domain_lock);
614
615 respond:
616         mlog(0, "We respond with %u\n", response);
617
618         return response;
619 }
620
621 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data)
622 {
623         struct dlm_assert_joined *assert;
624         struct dlm_ctxt *dlm = NULL;
625
626         assert = (struct dlm_assert_joined *) msg->buf;
627
628         mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
629                   assert->domain);
630
631         spin_lock(&dlm_domain_lock);
632         dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
633         /* XXX should we consider no dlm ctxt an error? */
634         if (dlm) {
635                 spin_lock(&dlm->spinlock);
636
637                 /* Alright, this node has officially joined our
638                  * domain. Set him in the map and clean up our
639                  * leftover join state. */
640                 BUG_ON(dlm->joining_node != assert->node_idx);
641                 set_bit(assert->node_idx, dlm->domain_map);
642                 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
643
644                 __dlm_print_nodes(dlm);
645
646                 /* notify anything attached to the heartbeat events */
647                 dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
648
649                 spin_unlock(&dlm->spinlock);
650         }
651         spin_unlock(&dlm_domain_lock);
652
653         return 0;
654 }
655
656 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data)
657 {
658         struct dlm_cancel_join *cancel;
659         struct dlm_ctxt *dlm = NULL;
660
661         cancel = (struct dlm_cancel_join *) msg->buf;
662
663         mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
664                   cancel->domain);
665
666         spin_lock(&dlm_domain_lock);
667         dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
668
669         if (dlm) {
670                 spin_lock(&dlm->spinlock);
671
672                 /* Yikes, this guy wants to cancel his join. No
673                  * problem, we simply cleanup our join state. */
674                 BUG_ON(dlm->joining_node != cancel->node_idx);
675                 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
676
677                 spin_unlock(&dlm->spinlock);
678         }
679         spin_unlock(&dlm_domain_lock);
680
681         return 0;
682 }
683
684 static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
685                                     unsigned int node)
686 {
687         int status;
688         struct dlm_cancel_join cancel_msg;
689
690         memset(&cancel_msg, 0, sizeof(cancel_msg));
691         cancel_msg.node_idx = dlm->node_num;
692         cancel_msg.name_len = strlen(dlm->name);
693         memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
694
695         status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
696                                     &cancel_msg, sizeof(cancel_msg), node,
697                                     NULL);
698         if (status < 0) {
699                 mlog_errno(status);
700                 goto bail;
701         }
702
703 bail:
704         return status;
705 }
706
707 /* map_size should be in bytes. */
708 static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
709                                  unsigned long *node_map,
710                                  unsigned int map_size)
711 {
712         int status, tmpstat;
713         unsigned int node;
714
715         if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
716                          sizeof(unsigned long))) {
717                 mlog(ML_ERROR,
718                      "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
719                      map_size, BITS_TO_LONGS(O2NM_MAX_NODES));
720                 return -EINVAL;
721         }
722
723         status = 0;
724         node = -1;
725         while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
726                                      node + 1)) < O2NM_MAX_NODES) {
727                 if (node == dlm->node_num)
728                         continue;
729
730                 tmpstat = dlm_send_one_join_cancel(dlm, node);
731                 if (tmpstat) {
732                         mlog(ML_ERROR, "Error return %d cancelling join on "
733                              "node %d\n", tmpstat, node);
734                         if (!status)
735                                 status = tmpstat;
736                 }
737         }
738
739         if (status)
740                 mlog_errno(status);
741         return status;
742 }
743
744 static int dlm_request_join(struct dlm_ctxt *dlm,
745                             int node,
746                             enum dlm_query_join_response *response)
747 {
748         int status, retval;
749         struct dlm_query_join_request join_msg;
750
751         mlog(0, "querying node %d\n", node);
752
753         memset(&join_msg, 0, sizeof(join_msg));
754         join_msg.node_idx = dlm->node_num;
755         join_msg.name_len = strlen(dlm->name);
756         memcpy(join_msg.domain, dlm->name, join_msg.name_len);
757
758         status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
759                                     sizeof(join_msg), node, &retval);
760         if (status < 0 && status != -ENOPROTOOPT) {
761                 mlog_errno(status);
762                 goto bail;
763         }
764
765         /* -ENOPROTOOPT from the net code means the other side isn't
766             listening for our message type -- that's fine, it means
767             his dlm isn't up, so we can consider him a 'yes' but not
768             joined into the domain.  */
769         if (status == -ENOPROTOOPT) {
770                 status = 0;
771                 *response = JOIN_OK_NO_MAP;
772         } else if (retval == JOIN_DISALLOW ||
773                    retval == JOIN_OK ||
774                    retval == JOIN_OK_NO_MAP) {
775                 *response = retval;
776         } else {
777                 status = -EINVAL;
778                 mlog(ML_ERROR, "invalid response %d from node %u\n", retval,
779                      node);
780         }
781
782         mlog(0, "status %d, node %d response is %d\n", status, node,
783                   *response);
784
785 bail:
786         return status;
787 }
788
789 static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
790                                     unsigned int node)
791 {
792         int status;
793         struct dlm_assert_joined assert_msg;
794
795         mlog(0, "Sending join assert to node %u\n", node);
796
797         memset(&assert_msg, 0, sizeof(assert_msg));
798         assert_msg.node_idx = dlm->node_num;
799         assert_msg.name_len = strlen(dlm->name);
800         memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
801
802         status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
803                                     &assert_msg, sizeof(assert_msg), node,
804                                     NULL);
805         if (status < 0)
806                 mlog_errno(status);
807
808         return status;
809 }
810
811 static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
812                                   unsigned long *node_map)
813 {
814         int status, node, live;
815
816         status = 0;
817         node = -1;
818         while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
819                                      node + 1)) < O2NM_MAX_NODES) {
820                 if (node == dlm->node_num)
821                         continue;
822
823                 do {
824                         /* It is very important that this message be
825                          * received so we spin until either the node
826                          * has died or it gets the message. */
827                         status = dlm_send_one_join_assert(dlm, node);
828
829                         spin_lock(&dlm->spinlock);
830                         live = test_bit(node, dlm->live_nodes_map);
831                         spin_unlock(&dlm->spinlock);
832
833                         if (status) {
834                                 mlog(ML_ERROR, "Error return %d asserting "
835                                      "join on node %d\n", status, node);
836
837                                 /* give us some time between errors... */
838                                 if (live)
839                                         msleep(DLM_DOMAIN_BACKOFF_MS);
840                         }
841                 } while (status && live);
842         }
843 }
844
845 struct domain_join_ctxt {
846         unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
847         unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
848 };
849
850 static int dlm_should_restart_join(struct dlm_ctxt *dlm,
851                                    struct domain_join_ctxt *ctxt,
852                                    enum dlm_query_join_response response)
853 {
854         int ret;
855
856         if (response == JOIN_DISALLOW) {
857                 mlog(0, "Latest response of disallow -- should restart\n");
858                 return 1;
859         }
860
861         spin_lock(&dlm->spinlock);
862         /* For now, we restart the process if the node maps have
863          * changed at all */
864         ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
865                      sizeof(dlm->live_nodes_map));
866         spin_unlock(&dlm->spinlock);
867
868         if (ret)
869                 mlog(0, "Node maps changed -- should restart\n");
870
871         return ret;
872 }
873
874 static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
875 {
876         int status = 0, tmpstat, node;
877         struct domain_join_ctxt *ctxt;
878         enum dlm_query_join_response response;
879
880         mlog_entry("%p", dlm);
881
882         ctxt = kcalloc(1, sizeof(*ctxt), GFP_KERNEL);
883         if (!ctxt) {
884                 status = -ENOMEM;
885                 mlog_errno(status);
886                 goto bail;
887         }
888
889         /* group sem locking should work for us here -- we're already
890          * registered for heartbeat events so filling this should be
891          * atomic wrt getting those handlers called. */
892         o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
893
894         spin_lock(&dlm->spinlock);
895         memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
896
897         __dlm_set_joining_node(dlm, dlm->node_num);
898
899         spin_unlock(&dlm->spinlock);
900
901         node = -1;
902         while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
903                                      node + 1)) < O2NM_MAX_NODES) {
904                 if (node == dlm->node_num)
905                         continue;
906
907                 status = dlm_request_join(dlm, node, &response);
908                 if (status < 0) {
909                         mlog_errno(status);
910                         goto bail;
911                 }
912
913                 /* Ok, either we got a response or the node doesn't have a
914                  * dlm up. */
915                 if (response == JOIN_OK)
916                         set_bit(node, ctxt->yes_resp_map);
917
918                 if (dlm_should_restart_join(dlm, ctxt, response)) {
919                         status = -EAGAIN;
920                         goto bail;
921                 }
922         }
923
924         mlog(0, "Yay, done querying nodes!\n");
925
926         /* Yay, everyone agree's we can join the domain. My domain is
927          * comprised of all nodes who were put in the
928          * yes_resp_map. Copy that into our domain map and send a join
929          * assert message to clean up everyone elses state. */
930         spin_lock(&dlm->spinlock);
931         memcpy(dlm->domain_map, ctxt->yes_resp_map,
932                sizeof(ctxt->yes_resp_map));
933         set_bit(dlm->node_num, dlm->domain_map);
934         spin_unlock(&dlm->spinlock);
935
936         dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
937
938         /* Joined state *must* be set before the joining node
939          * information, otherwise the query_join handler may read no
940          * current joiner but a state of NEW and tell joining nodes
941          * we're not in the domain. */
942         spin_lock(&dlm_domain_lock);
943         dlm->dlm_state = DLM_CTXT_JOINED;
944         dlm->num_joins++;
945         spin_unlock(&dlm_domain_lock);
946
947 bail:
948         spin_lock(&dlm->spinlock);
949         __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
950         if (!status)
951                 __dlm_print_nodes(dlm);
952         spin_unlock(&dlm->spinlock);
953
954         if (ctxt) {
955                 /* Do we need to send a cancel message to any nodes? */
956                 if (status < 0) {
957                         tmpstat = dlm_send_join_cancels(dlm,
958                                                         ctxt->yes_resp_map,
959                                                         sizeof(ctxt->yes_resp_map));
960                         if (tmpstat < 0)
961                                 mlog_errno(tmpstat);
962                 }
963                 kfree(ctxt);
964         }
965
966         mlog(0, "returning %d\n", status);
967         return status;
968 }
969
970 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
971 {
972         o2hb_unregister_callback(&dlm->dlm_hb_up);
973         o2hb_unregister_callback(&dlm->dlm_hb_down);
974         o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
975 }
976
977 static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
978 {
979         int status;
980
981         mlog(0, "registering handlers.\n");
982
983         o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
984                             dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
985         status = o2hb_register_callback(&dlm->dlm_hb_down);
986         if (status)
987                 goto bail;
988
989         o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
990                             dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
991         status = o2hb_register_callback(&dlm->dlm_hb_up);
992         if (status)
993                 goto bail;
994
995         status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
996                                         sizeof(struct dlm_master_request),
997                                         dlm_master_request_handler,
998                                         dlm, &dlm->dlm_domain_handlers);
999         if (status)
1000                 goto bail;
1001
1002         status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1003                                         sizeof(struct dlm_assert_master),
1004                                         dlm_assert_master_handler,
1005                                         dlm, &dlm->dlm_domain_handlers);
1006         if (status)
1007                 goto bail;
1008
1009         status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1010                                         sizeof(struct dlm_create_lock),
1011                                         dlm_create_lock_handler,
1012                                         dlm, &dlm->dlm_domain_handlers);
1013         if (status)
1014                 goto bail;
1015
1016         status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1017                                         DLM_CONVERT_LOCK_MAX_LEN,
1018                                         dlm_convert_lock_handler,
1019                                         dlm, &dlm->dlm_domain_handlers);
1020         if (status)
1021                 goto bail;
1022
1023         status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1024                                         DLM_UNLOCK_LOCK_MAX_LEN,
1025                                         dlm_unlock_lock_handler,
1026                                         dlm, &dlm->dlm_domain_handlers);
1027         if (status)
1028                 goto bail;
1029
1030         status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1031                                         DLM_PROXY_AST_MAX_LEN,
1032                                         dlm_proxy_ast_handler,
1033                                         dlm, &dlm->dlm_domain_handlers);
1034         if (status)
1035                 goto bail;
1036
1037         status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1038                                         sizeof(struct dlm_exit_domain),
1039                                         dlm_exit_domain_handler,
1040                                         dlm, &dlm->dlm_domain_handlers);
1041         if (status)
1042                 goto bail;
1043
1044         status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1045                                         sizeof(struct dlm_migrate_request),
1046                                         dlm_migrate_request_handler,
1047                                         dlm, &dlm->dlm_domain_handlers);
1048         if (status)
1049                 goto bail;
1050
1051         status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1052                                         DLM_MIG_LOCKRES_MAX_LEN,
1053                                         dlm_mig_lockres_handler,
1054                                         dlm, &dlm->dlm_domain_handlers);
1055         if (status)
1056                 goto bail;
1057
1058         status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1059                                         sizeof(struct dlm_master_requery),
1060                                         dlm_master_requery_handler,
1061                                         dlm, &dlm->dlm_domain_handlers);
1062         if (status)
1063                 goto bail;
1064
1065         status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1066                                         sizeof(struct dlm_lock_request),
1067                                         dlm_request_all_locks_handler,
1068                                         dlm, &dlm->dlm_domain_handlers);
1069         if (status)
1070                 goto bail;
1071
1072         status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1073                                         sizeof(struct dlm_reco_data_done),
1074                                         dlm_reco_data_done_handler,
1075                                         dlm, &dlm->dlm_domain_handlers);
1076         if (status)
1077                 goto bail;
1078
1079         status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1080                                         sizeof(struct dlm_begin_reco),
1081                                         dlm_begin_reco_handler,
1082                                         dlm, &dlm->dlm_domain_handlers);
1083         if (status)
1084                 goto bail;
1085
1086         status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1087                                         sizeof(struct dlm_finalize_reco),
1088                                         dlm_finalize_reco_handler,
1089                                         dlm, &dlm->dlm_domain_handlers);
1090         if (status)
1091                 goto bail;
1092
1093 bail:
1094         if (status)
1095                 dlm_unregister_domain_handlers(dlm);
1096
1097         return status;
1098 }
1099
1100 static int dlm_join_domain(struct dlm_ctxt *dlm)
1101 {
1102         int status;
1103
1104         BUG_ON(!dlm);
1105
1106         mlog(0, "Join domain %s\n", dlm->name);
1107
1108         status = dlm_register_domain_handlers(dlm);
1109         if (status) {
1110                 mlog_errno(status);
1111                 goto bail;
1112         }
1113
1114         status = dlm_launch_thread(dlm);
1115         if (status < 0) {
1116                 mlog_errno(status);
1117                 goto bail;
1118         }
1119
1120         status = dlm_launch_recovery_thread(dlm);
1121         if (status < 0) {
1122                 mlog_errno(status);
1123                 goto bail;
1124         }
1125
1126         do {
1127                 unsigned int backoff;
1128                 status = dlm_try_to_join_domain(dlm);
1129
1130                 /* If we're racing another node to the join, then we
1131                  * need to back off temporarily and let them
1132                  * complete. */
1133                 if (status == -EAGAIN) {
1134                         if (signal_pending(current)) {
1135                                 status = -ERESTARTSYS;
1136                                 goto bail;
1137                         }
1138
1139                         /*
1140                          * <chip> After you!
1141                          * <dale> No, after you!
1142                          * <chip> I insist!
1143                          * <dale> But you first!
1144                          * ...
1145                          */
1146                         backoff = (unsigned int)(jiffies & 0x3);
1147                         backoff *= DLM_DOMAIN_BACKOFF_MS;
1148                         mlog(0, "backoff %d\n", backoff);
1149                         msleep(backoff);
1150                 }
1151         } while (status == -EAGAIN);
1152
1153         if (status < 0) {
1154                 mlog_errno(status);
1155                 goto bail;
1156         }
1157
1158         status = 0;
1159 bail:
1160         wake_up(&dlm_domain_events);
1161
1162         if (status) {
1163                 dlm_unregister_domain_handlers(dlm);
1164                 dlm_complete_thread(dlm);
1165                 dlm_complete_recovery_thread(dlm);
1166         }
1167
1168         return status;
1169 }
1170
1171 static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1172                                 u32 key)
1173 {
1174         int i;
1175         struct dlm_ctxt *dlm = NULL;
1176
1177         dlm = kcalloc(1, sizeof(*dlm), GFP_KERNEL);
1178         if (!dlm) {
1179                 mlog_errno(-ENOMEM);
1180                 goto leave;
1181         }
1182
1183         dlm->name = kmalloc(strlen(domain) + 1, GFP_KERNEL);
1184         if (dlm->name == NULL) {
1185                 mlog_errno(-ENOMEM);
1186                 kfree(dlm);
1187                 dlm = NULL;
1188                 goto leave;
1189         }
1190
1191         dlm->lockres_hash = (struct hlist_head *) __get_free_page(GFP_KERNEL);
1192         if (!dlm->lockres_hash) {
1193                 mlog_errno(-ENOMEM);
1194                 kfree(dlm->name);
1195                 kfree(dlm);
1196                 dlm = NULL;
1197                 goto leave;
1198         }
1199
1200         for (i=0; i<DLM_HASH_BUCKETS; i++)
1201                 INIT_HLIST_HEAD(&dlm->lockres_hash[i]);
1202
1203         strcpy(dlm->name, domain);
1204         dlm->key = key;
1205         dlm->node_num = o2nm_this_node();
1206
1207         spin_lock_init(&dlm->spinlock);
1208         spin_lock_init(&dlm->master_lock);
1209         spin_lock_init(&dlm->ast_lock);
1210         INIT_LIST_HEAD(&dlm->list);
1211         INIT_LIST_HEAD(&dlm->dirty_list);
1212         INIT_LIST_HEAD(&dlm->reco.resources);
1213         INIT_LIST_HEAD(&dlm->reco.received);
1214         INIT_LIST_HEAD(&dlm->reco.node_data);
1215         INIT_LIST_HEAD(&dlm->purge_list);
1216         INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
1217         dlm->reco.state = 0;
1218
1219         INIT_LIST_HEAD(&dlm->pending_asts);
1220         INIT_LIST_HEAD(&dlm->pending_basts);
1221
1222         mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
1223                   dlm->recovery_map, &(dlm->recovery_map[0]));
1224
1225         memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
1226         memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
1227         memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
1228
1229         dlm->dlm_thread_task = NULL;
1230         dlm->dlm_reco_thread_task = NULL;
1231         init_waitqueue_head(&dlm->dlm_thread_wq);
1232         init_waitqueue_head(&dlm->dlm_reco_thread_wq);
1233         init_waitqueue_head(&dlm->reco.event);
1234         init_waitqueue_head(&dlm->ast_wq);
1235         init_waitqueue_head(&dlm->migration_wq);
1236         INIT_LIST_HEAD(&dlm->master_list);
1237         INIT_LIST_HEAD(&dlm->mle_hb_events);
1238
1239         dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
1240         init_waitqueue_head(&dlm->dlm_join_events);
1241
1242         dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
1243         dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
1244         atomic_set(&dlm->local_resources, 0);
1245         atomic_set(&dlm->remote_resources, 0);
1246         atomic_set(&dlm->unknown_resources, 0);
1247
1248         spin_lock_init(&dlm->work_lock);
1249         INIT_LIST_HEAD(&dlm->work_list);
1250         INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work, dlm);
1251
1252         kref_init(&dlm->dlm_refs);
1253         dlm->dlm_state = DLM_CTXT_NEW;
1254
1255         INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
1256
1257         mlog(0, "context init: refcount %u\n",
1258                   atomic_read(&dlm->dlm_refs.refcount));
1259
1260 leave:
1261         return dlm;
1262 }
1263
1264 /*
1265  * dlm_register_domain: one-time setup per "domain"
1266  */
1267 struct dlm_ctxt * dlm_register_domain(const char *domain,
1268                                u32 key)
1269 {
1270         int ret;
1271         struct dlm_ctxt *dlm = NULL;
1272         struct dlm_ctxt *new_ctxt = NULL;
1273
1274         if (strlen(domain) > O2NM_MAX_NAME_LEN) {
1275                 ret = -ENAMETOOLONG;
1276                 mlog(ML_ERROR, "domain name length too long\n");
1277                 goto leave;
1278         }
1279
1280         if (!o2hb_check_local_node_heartbeating()) {
1281                 mlog(ML_ERROR, "the local node has not been configured, or is "
1282                      "not heartbeating\n");
1283                 ret = -EPROTO;
1284                 goto leave;
1285         }
1286
1287         mlog(0, "register called for domain \"%s\"\n", domain);
1288
1289 retry:
1290         dlm = NULL;
1291         if (signal_pending(current)) {
1292                 ret = -ERESTARTSYS;
1293                 mlog_errno(ret);
1294                 goto leave;
1295         }
1296
1297         spin_lock(&dlm_domain_lock);
1298
1299         dlm = __dlm_lookup_domain(domain);
1300         if (dlm) {
1301                 if (dlm->dlm_state != DLM_CTXT_JOINED) {
1302                         spin_unlock(&dlm_domain_lock);
1303
1304                         mlog(0, "This ctxt is not joined yet!\n");
1305                         wait_event_interruptible(dlm_domain_events,
1306                                                  dlm_wait_on_domain_helper(
1307                                                          domain));
1308                         goto retry;
1309                 }
1310
1311                 __dlm_get(dlm);
1312                 dlm->num_joins++;
1313
1314                 spin_unlock(&dlm_domain_lock);
1315
1316                 ret = 0;
1317                 goto leave;
1318         }
1319
1320         /* doesn't exist */
1321         if (!new_ctxt) {
1322                 spin_unlock(&dlm_domain_lock);
1323
1324                 new_ctxt = dlm_alloc_ctxt(domain, key);
1325                 if (new_ctxt)
1326                         goto retry;
1327
1328                 ret = -ENOMEM;
1329                 mlog_errno(ret);
1330                 goto leave;
1331         }
1332
1333         /* a little variable switch-a-roo here... */
1334         dlm = new_ctxt;
1335         new_ctxt = NULL;
1336
1337         /* add the new domain */
1338         list_add_tail(&dlm->list, &dlm_domains);
1339         spin_unlock(&dlm_domain_lock);
1340
1341         ret = dlm_join_domain(dlm);
1342         if (ret) {
1343                 mlog_errno(ret);
1344                 dlm_put(dlm);
1345                 goto leave;
1346         }
1347
1348         ret = 0;
1349 leave:
1350         if (new_ctxt)
1351                 dlm_free_ctxt_mem(new_ctxt);
1352
1353         if (ret < 0)
1354                 dlm = ERR_PTR(ret);
1355
1356         return dlm;
1357 }
1358 EXPORT_SYMBOL_GPL(dlm_register_domain);
1359
1360 static LIST_HEAD(dlm_join_handlers);
1361
1362 static void dlm_unregister_net_handlers(void)
1363 {
1364         o2net_unregister_handler_list(&dlm_join_handlers);
1365 }
1366
1367 static int dlm_register_net_handlers(void)
1368 {
1369         int status = 0;
1370
1371         status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1372                                         sizeof(struct dlm_query_join_request),
1373                                         dlm_query_join_handler,
1374                                         NULL, &dlm_join_handlers);
1375         if (status)
1376                 goto bail;
1377
1378         status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1379                                         sizeof(struct dlm_assert_joined),
1380                                         dlm_assert_joined_handler,
1381                                         NULL, &dlm_join_handlers);
1382         if (status)
1383                 goto bail;
1384
1385         status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1386                                         sizeof(struct dlm_cancel_join),
1387                                         dlm_cancel_join_handler,
1388                                         NULL, &dlm_join_handlers);
1389
1390 bail:
1391         if (status < 0)
1392                 dlm_unregister_net_handlers();
1393
1394         return status;
1395 }
1396
1397 /* Domain eviction callback handling.
1398  *
1399  * The file system requires notification of node death *before* the
1400  * dlm completes it's recovery work, otherwise it may be able to
1401  * acquire locks on resources requiring recovery. Since the dlm can
1402  * evict a node from it's domain *before* heartbeat fires, a similar
1403  * mechanism is required. */
1404
1405 /* Eviction is not expected to happen often, so a per-domain lock is
1406  * not necessary. Eviction callbacks are allowed to sleep for short
1407  * periods of time. */
1408 static DECLARE_RWSEM(dlm_callback_sem);
1409
1410 void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
1411                                         int node_num)
1412 {
1413         struct list_head *iter;
1414         struct dlm_eviction_cb *cb;
1415
1416         down_read(&dlm_callback_sem);
1417         list_for_each(iter, &dlm->dlm_eviction_callbacks) {
1418                 cb = list_entry(iter, struct dlm_eviction_cb, ec_item);
1419
1420                 cb->ec_func(node_num, cb->ec_data);
1421         }
1422         up_read(&dlm_callback_sem);
1423 }
1424
1425 void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
1426                            dlm_eviction_func *f,
1427                            void *data)
1428 {
1429         INIT_LIST_HEAD(&cb->ec_item);
1430         cb->ec_func = f;
1431         cb->ec_data = data;
1432 }
1433 EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
1434
1435 void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
1436                               struct dlm_eviction_cb *cb)
1437 {
1438         down_write(&dlm_callback_sem);
1439         list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
1440         up_write(&dlm_callback_sem);
1441 }
1442 EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
1443
1444 void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
1445 {
1446         down_write(&dlm_callback_sem);
1447         list_del_init(&cb->ec_item);
1448         up_write(&dlm_callback_sem);
1449 }
1450 EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
1451
1452 static int __init dlm_init(void)
1453 {
1454         int status;
1455
1456         dlm_print_version();
1457
1458         status = dlm_init_mle_cache();
1459         if (status)
1460                 return -1;
1461
1462         status = dlm_register_net_handlers();
1463         if (status) {
1464                 dlm_destroy_mle_cache();
1465                 return -1;
1466         }
1467
1468         return 0;
1469 }
1470
1471 static void __exit dlm_exit (void)
1472 {
1473         dlm_unregister_net_handlers();
1474         dlm_destroy_mle_cache();
1475 }
1476
1477 MODULE_AUTHOR("Oracle");
1478 MODULE_LICENSE("GPL");
1479
1480 module_init(dlm_init);
1481 module_exit(dlm_exit);