1345c3d44ede133ff9bf9f940ad949844b15fb7d
[powerpc.git] / fs / gfs2 / glock.c
1 /*
2  * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
3  * Copyright (C) 2004-2006 Red Hat, Inc.  All rights reserved.
4  *
5  * This copyrighted material is made available to anyone wishing to use,
6  * modify, copy, or redistribute it subject to the terms and conditions
7  * of the GNU General Public License version 2.
8  */
9
10 #include <linux/sched.h>
11 #include <linux/slab.h>
12 #include <linux/spinlock.h>
13 #include <linux/completion.h>
14 #include <linux/buffer_head.h>
15 #include <linux/delay.h>
16 #include <linux/sort.h>
17 #include <linux/jhash.h>
18 #include <linux/kallsyms.h>
19 #include <linux/gfs2_ondisk.h>
20 #include <linux/list.h>
21 #include <linux/lm_interface.h>
22 #include <linux/wait.h>
23 #include <asm/uaccess.h>
24
25 #include "gfs2.h"
26 #include "incore.h"
27 #include "glock.h"
28 #include "glops.h"
29 #include "inode.h"
30 #include "lm.h"
31 #include "lops.h"
32 #include "meta_io.h"
33 #include "quota.h"
34 #include "super.h"
35 #include "util.h"
36
37 struct gfs2_gl_hash_bucket {
38         struct hlist_head hb_list;
39 };
40
41 typedef void (*glock_examiner) (struct gfs2_glock * gl);
42
43 static int gfs2_dump_lockstate(struct gfs2_sbd *sdp);
44 static int dump_glock(struct gfs2_glock *gl);
45 static int dump_inode(struct gfs2_inode *ip);
46
47 #define GFS2_GL_HASH_SHIFT      15
48 #define GFS2_GL_HASH_SIZE       (1 << GFS2_GL_HASH_SHIFT)
49 #define GFS2_GL_HASH_MASK       (GFS2_GL_HASH_SIZE - 1)
50
51 static struct gfs2_gl_hash_bucket gl_hash_table[GFS2_GL_HASH_SIZE];
52
53 /*
54  * Despite what you might think, the numbers below are not arbitrary :-)
55  * They are taken from the ipv4 routing hash code, which is well tested
56  * and thus should be nearly optimal. Later on we might tweek the numbers
57  * but for now this should be fine.
58  *
59  * The reason for putting the locks in a separate array from the list heads
60  * is that we can have fewer locks than list heads and save memory. We use
61  * the same hash function for both, but with a different hash mask.
62  */
63 #if defined(CONFIG_SMP) || defined(CONFIG_DEBUG_SPINLOCK) || \
64         defined(CONFIG_PROVE_LOCKING)
65
66 #ifdef CONFIG_LOCKDEP
67 # define GL_HASH_LOCK_SZ        256
68 #else
69 # if NR_CPUS >= 32
70 #  define GL_HASH_LOCK_SZ       4096
71 # elif NR_CPUS >= 16
72 #  define GL_HASH_LOCK_SZ       2048
73 # elif NR_CPUS >= 8
74 #  define GL_HASH_LOCK_SZ       1024
75 # elif NR_CPUS >= 4
76 #  define GL_HASH_LOCK_SZ       512
77 # else
78 #  define GL_HASH_LOCK_SZ       256
79 # endif
80 #endif
81
82 /* We never want more locks than chains */
83 #if GFS2_GL_HASH_SIZE < GL_HASH_LOCK_SZ
84 # undef GL_HASH_LOCK_SZ
85 # define GL_HASH_LOCK_SZ GFS2_GL_HASH_SIZE
86 #endif
87
88 static rwlock_t gl_hash_locks[GL_HASH_LOCK_SZ];
89
90 static inline rwlock_t *gl_lock_addr(unsigned int x)
91 {
92         return &gl_hash_locks[x & (GL_HASH_LOCK_SZ-1)];
93 }
94 #else /* not SMP, so no spinlocks required */
95 static inline rwlock_t *gl_lock_addr(unsigned int x)
96 {
97         return NULL;
98 }
99 #endif
100
101 /**
102  * relaxed_state_ok - is a requested lock compatible with the current lock mode?
103  * @actual: the current state of the lock
104  * @requested: the lock state that was requested by the caller
105  * @flags: the modifier flags passed in by the caller
106  *
107  * Returns: 1 if the locks are compatible, 0 otherwise
108  */
109
110 static inline int relaxed_state_ok(unsigned int actual, unsigned requested,
111                                    int flags)
112 {
113         if (actual == requested)
114                 return 1;
115
116         if (flags & GL_EXACT)
117                 return 0;
118
119         if (actual == LM_ST_EXCLUSIVE && requested == LM_ST_SHARED)
120                 return 1;
121
122         if (actual != LM_ST_UNLOCKED && (flags & LM_FLAG_ANY))
123                 return 1;
124
125         return 0;
126 }
127
128 /**
129  * gl_hash() - Turn glock number into hash bucket number
130  * @lock: The glock number
131  *
132  * Returns: The number of the corresponding hash bucket
133  */
134
135 static unsigned int gl_hash(const struct gfs2_sbd *sdp,
136                             const struct lm_lockname *name)
137 {
138         unsigned int h;
139
140         h = jhash(&name->ln_number, sizeof(u64), 0);
141         h = jhash(&name->ln_type, sizeof(unsigned int), h);
142         h = jhash(&sdp, sizeof(struct gfs2_sbd *), h);
143         h &= GFS2_GL_HASH_MASK;
144
145         return h;
146 }
147
148 /**
149  * glock_free() - Perform a few checks and then release struct gfs2_glock
150  * @gl: The glock to release
151  *
152  * Also calls lock module to release its internal structure for this glock.
153  *
154  */
155
156 static void glock_free(struct gfs2_glock *gl)
157 {
158         struct gfs2_sbd *sdp = gl->gl_sbd;
159         struct inode *aspace = gl->gl_aspace;
160
161         gfs2_lm_put_lock(sdp, gl->gl_lock);
162
163         if (aspace)
164                 gfs2_aspace_put(aspace);
165
166         kmem_cache_free(gfs2_glock_cachep, gl);
167 }
168
169 /**
170  * gfs2_glock_hold() - increment reference count on glock
171  * @gl: The glock to hold
172  *
173  */
174
175 void gfs2_glock_hold(struct gfs2_glock *gl)
176 {
177         atomic_inc(&gl->gl_ref);
178 }
179
180 /**
181  * gfs2_glock_put() - Decrement reference count on glock
182  * @gl: The glock to put
183  *
184  */
185
186 int gfs2_glock_put(struct gfs2_glock *gl)
187 {
188         int rv = 0;
189         struct gfs2_sbd *sdp = gl->gl_sbd;
190
191         write_lock(gl_lock_addr(gl->gl_hash));
192         if (atomic_dec_and_test(&gl->gl_ref)) {
193                 hlist_del(&gl->gl_list);
194                 write_unlock(gl_lock_addr(gl->gl_hash));
195                 BUG_ON(spin_is_locked(&gl->gl_spin));
196                 gfs2_assert(sdp, gl->gl_state == LM_ST_UNLOCKED);
197                 gfs2_assert(sdp, list_empty(&gl->gl_reclaim));
198                 gfs2_assert(sdp, list_empty(&gl->gl_holders));
199                 gfs2_assert(sdp, list_empty(&gl->gl_waiters1));
200                 gfs2_assert(sdp, list_empty(&gl->gl_waiters2));
201                 gfs2_assert(sdp, list_empty(&gl->gl_waiters3));
202                 glock_free(gl);
203                 rv = 1;
204                 goto out;
205         }
206         write_unlock(gl_lock_addr(gl->gl_hash));
207 out:
208         return rv;
209 }
210
211 /**
212  * queue_empty - check to see if a glock's queue is empty
213  * @gl: the glock
214  * @head: the head of the queue to check
215  *
216  * This function protects the list in the event that a process already
217  * has a holder on the list and is adding a second holder for itself.
218  * The glmutex lock is what generally prevents processes from working
219  * on the same glock at once, but the special case of adding a second
220  * holder for yourself ("recursive" locking) doesn't involve locking
221  * glmutex, making the spin lock necessary.
222  *
223  * Returns: 1 if the queue is empty
224  */
225
226 static inline int queue_empty(struct gfs2_glock *gl, struct list_head *head)
227 {
228         int empty;
229         spin_lock(&gl->gl_spin);
230         empty = list_empty(head);
231         spin_unlock(&gl->gl_spin);
232         return empty;
233 }
234
235 /**
236  * search_bucket() - Find struct gfs2_glock by lock number
237  * @bucket: the bucket to search
238  * @name: The lock name
239  *
240  * Returns: NULL, or the struct gfs2_glock with the requested number
241  */
242
243 static struct gfs2_glock *search_bucket(unsigned int hash,
244                                         const struct gfs2_sbd *sdp,
245                                         const struct lm_lockname *name)
246 {
247         struct gfs2_glock *gl;
248         struct hlist_node *h;
249
250         hlist_for_each_entry(gl, h, &gl_hash_table[hash].hb_list, gl_list) {
251                 if (!lm_name_equal(&gl->gl_name, name))
252                         continue;
253                 if (gl->gl_sbd != sdp)
254                         continue;
255
256                 atomic_inc(&gl->gl_ref);
257
258                 return gl;
259         }
260
261         return NULL;
262 }
263
264 /**
265  * gfs2_glock_find() - Find glock by lock number
266  * @sdp: The GFS2 superblock
267  * @name: The lock name
268  *
269  * Returns: NULL, or the struct gfs2_glock with the requested number
270  */
271
272 static struct gfs2_glock *gfs2_glock_find(const struct gfs2_sbd *sdp,
273                                           const struct lm_lockname *name)
274 {
275         unsigned int hash = gl_hash(sdp, name);
276         struct gfs2_glock *gl;
277
278         read_lock(gl_lock_addr(hash));
279         gl = search_bucket(hash, sdp, name);
280         read_unlock(gl_lock_addr(hash));
281
282         return gl;
283 }
284
285 /**
286  * gfs2_glock_get() - Get a glock, or create one if one doesn't exist
287  * @sdp: The GFS2 superblock
288  * @number: the lock number
289  * @glops: The glock_operations to use
290  * @create: If 0, don't create the glock if it doesn't exist
291  * @glp: the glock is returned here
292  *
293  * This does not lock a glock, just finds/creates structures for one.
294  *
295  * Returns: errno
296  */
297
298 int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
299                    const struct gfs2_glock_operations *glops, int create,
300                    struct gfs2_glock **glp)
301 {
302         struct lm_lockname name = { .ln_number = number, .ln_type = glops->go_type };
303         struct gfs2_glock *gl, *tmp;
304         unsigned int hash = gl_hash(sdp, &name);
305         int error;
306
307         read_lock(gl_lock_addr(hash));
308         gl = search_bucket(hash, sdp, &name);
309         read_unlock(gl_lock_addr(hash));
310
311         if (gl || !create) {
312                 *glp = gl;
313                 return 0;
314         }
315
316         gl = kmem_cache_alloc(gfs2_glock_cachep, GFP_KERNEL);
317         if (!gl)
318                 return -ENOMEM;
319
320         gl->gl_flags = 0;
321         gl->gl_name = name;
322         atomic_set(&gl->gl_ref, 1);
323         gl->gl_state = LM_ST_UNLOCKED;
324         gl->gl_hash = hash;
325         gl->gl_owner = NULL;
326         gl->gl_ip = 0;
327         gl->gl_ops = glops;
328         gl->gl_req_gh = NULL;
329         gl->gl_req_bh = NULL;
330         gl->gl_vn = 0;
331         gl->gl_stamp = jiffies;
332         gl->gl_object = NULL;
333         gl->gl_sbd = sdp;
334         gl->gl_aspace = NULL;
335         lops_init_le(&gl->gl_le, &gfs2_glock_lops);
336
337         /* If this glock protects actual on-disk data or metadata blocks,
338            create a VFS inode to manage the pages/buffers holding them. */
339         if (glops == &gfs2_inode_glops || glops == &gfs2_rgrp_glops) {
340                 gl->gl_aspace = gfs2_aspace_get(sdp);
341                 if (!gl->gl_aspace) {
342                         error = -ENOMEM;
343                         goto fail;
344                 }
345         }
346
347         error = gfs2_lm_get_lock(sdp, &name, &gl->gl_lock);
348         if (error)
349                 goto fail_aspace;
350
351         write_lock(gl_lock_addr(hash));
352         tmp = search_bucket(hash, sdp, &name);
353         if (tmp) {
354                 write_unlock(gl_lock_addr(hash));
355                 glock_free(gl);
356                 gl = tmp;
357         } else {
358                 hlist_add_head(&gl->gl_list, &gl_hash_table[hash].hb_list);
359                 write_unlock(gl_lock_addr(hash));
360         }
361
362         *glp = gl;
363
364         return 0;
365
366 fail_aspace:
367         if (gl->gl_aspace)
368                 gfs2_aspace_put(gl->gl_aspace);
369 fail:
370         kmem_cache_free(gfs2_glock_cachep, gl);
371         return error;
372 }
373
374 /**
375  * gfs2_holder_init - initialize a struct gfs2_holder in the default way
376  * @gl: the glock
377  * @state: the state we're requesting
378  * @flags: the modifier flags
379  * @gh: the holder structure
380  *
381  */
382
383 void gfs2_holder_init(struct gfs2_glock *gl, unsigned int state, unsigned flags,
384                       struct gfs2_holder *gh)
385 {
386         INIT_LIST_HEAD(&gh->gh_list);
387         gh->gh_gl = gl;
388         gh->gh_ip = (unsigned long)__builtin_return_address(0);
389         gh->gh_owner = current;
390         gh->gh_state = state;
391         gh->gh_flags = flags;
392         gh->gh_error = 0;
393         gh->gh_iflags = 0;
394         gfs2_glock_hold(gl);
395 }
396
397 /**
398  * gfs2_holder_reinit - reinitialize a struct gfs2_holder so we can requeue it
399  * @state: the state we're requesting
400  * @flags: the modifier flags
401  * @gh: the holder structure
402  *
403  * Don't mess with the glock.
404  *
405  */
406
407 void gfs2_holder_reinit(unsigned int state, unsigned flags, struct gfs2_holder *gh)
408 {
409         gh->gh_state = state;
410         gh->gh_flags = flags;
411         gh->gh_iflags &= 1 << HIF_ALLOCED;
412         gh->gh_ip = (unsigned long)__builtin_return_address(0);
413 }
414
415 /**
416  * gfs2_holder_uninit - uninitialize a holder structure (drop glock reference)
417  * @gh: the holder structure
418  *
419  */
420
421 void gfs2_holder_uninit(struct gfs2_holder *gh)
422 {
423         gfs2_glock_put(gh->gh_gl);
424         gh->gh_gl = NULL;
425         gh->gh_ip = 0;
426 }
427
428 /**
429  * gfs2_holder_get - get a struct gfs2_holder structure
430  * @gl: the glock
431  * @state: the state we're requesting
432  * @flags: the modifier flags
433  * @gfp_flags:
434  *
435  * Figure out how big an impact this function has.  Either:
436  * 1) Replace it with a cache of structures hanging off the struct gfs2_sbd
437  * 2) Leave it like it is
438  *
439  * Returns: the holder structure, NULL on ENOMEM
440  */
441
442 static struct gfs2_holder *gfs2_holder_get(struct gfs2_glock *gl,
443                                            unsigned int state,
444                                            int flags, gfp_t gfp_flags)
445 {
446         struct gfs2_holder *gh;
447
448         gh = kmalloc(sizeof(struct gfs2_holder), gfp_flags);
449         if (!gh)
450                 return NULL;
451
452         gfs2_holder_init(gl, state, flags, gh);
453         set_bit(HIF_ALLOCED, &gh->gh_iflags);
454         gh->gh_ip = (unsigned long)__builtin_return_address(0);
455         return gh;
456 }
457
458 /**
459  * gfs2_holder_put - get rid of a struct gfs2_holder structure
460  * @gh: the holder structure
461  *
462  */
463
464 static void gfs2_holder_put(struct gfs2_holder *gh)
465 {
466         gfs2_holder_uninit(gh);
467         kfree(gh);
468 }
469
470 static void gfs2_holder_dispose_or_wake(struct gfs2_holder *gh)
471 {
472         if (test_bit(HIF_DEALLOC, &gh->gh_iflags)) {
473                 gfs2_holder_put(gh);
474                 return;
475         }
476         clear_bit(HIF_WAIT, &gh->gh_iflags);
477         smp_mb();
478         wake_up_bit(&gh->gh_iflags, HIF_WAIT);
479 }
480
481 static int holder_wait(void *word)
482 {
483         schedule();
484         return 0;
485 }
486
487 static void wait_on_holder(struct gfs2_holder *gh)
488 {
489         might_sleep();
490         wait_on_bit(&gh->gh_iflags, HIF_WAIT, holder_wait, TASK_UNINTERRUPTIBLE);
491 }
492
493 /**
494  * rq_mutex - process a mutex request in the queue
495  * @gh: the glock holder
496  *
497  * Returns: 1 if the queue is blocked
498  */
499
500 static int rq_mutex(struct gfs2_holder *gh)
501 {
502         struct gfs2_glock *gl = gh->gh_gl;
503
504         list_del_init(&gh->gh_list);
505         /*  gh->gh_error never examined.  */
506         set_bit(GLF_LOCK, &gl->gl_flags);
507         clear_bit(HIF_WAIT, &gh->gh_flags);
508         smp_mb();
509         wake_up_bit(&gh->gh_iflags, HIF_WAIT);
510
511         return 1;
512 }
513
514 /**
515  * rq_promote - process a promote request in the queue
516  * @gh: the glock holder
517  *
518  * Acquire a new inter-node lock, or change a lock state to more restrictive.
519  *
520  * Returns: 1 if the queue is blocked
521  */
522
523 static int rq_promote(struct gfs2_holder *gh)
524 {
525         struct gfs2_glock *gl = gh->gh_gl;
526         struct gfs2_sbd *sdp = gl->gl_sbd;
527         const struct gfs2_glock_operations *glops = gl->gl_ops;
528
529         if (!relaxed_state_ok(gl->gl_state, gh->gh_state, gh->gh_flags)) {
530                 if (list_empty(&gl->gl_holders)) {
531                         gl->gl_req_gh = gh;
532                         set_bit(GLF_LOCK, &gl->gl_flags);
533                         spin_unlock(&gl->gl_spin);
534
535                         if (atomic_read(&sdp->sd_reclaim_count) >
536                             gfs2_tune_get(sdp, gt_reclaim_limit) &&
537                             !(gh->gh_flags & LM_FLAG_PRIORITY)) {
538                                 gfs2_reclaim_glock(sdp);
539                                 gfs2_reclaim_glock(sdp);
540                         }
541
542                         glops->go_xmote_th(gl, gh->gh_state, gh->gh_flags);
543                         spin_lock(&gl->gl_spin);
544                 }
545                 return 1;
546         }
547
548         if (list_empty(&gl->gl_holders)) {
549                 set_bit(HIF_FIRST, &gh->gh_iflags);
550                 set_bit(GLF_LOCK, &gl->gl_flags);
551         } else {
552                 struct gfs2_holder *next_gh;
553                 if (gh->gh_state == LM_ST_EXCLUSIVE)
554                         return 1;
555                 next_gh = list_entry(gl->gl_holders.next, struct gfs2_holder,
556                                      gh_list);
557                 if (next_gh->gh_state == LM_ST_EXCLUSIVE)
558                          return 1;
559         }
560
561         list_move_tail(&gh->gh_list, &gl->gl_holders);
562         gh->gh_error = 0;
563         set_bit(HIF_HOLDER, &gh->gh_iflags);
564
565         gfs2_holder_dispose_or_wake(gh);
566
567         return 0;
568 }
569
570 /**
571  * rq_demote - process a demote request in the queue
572  * @gh: the glock holder
573  *
574  * Returns: 1 if the queue is blocked
575  */
576
577 static int rq_demote(struct gfs2_holder *gh)
578 {
579         struct gfs2_glock *gl = gh->gh_gl;
580         const struct gfs2_glock_operations *glops = gl->gl_ops;
581
582         if (!list_empty(&gl->gl_holders))
583                 return 1;
584
585         if (gl->gl_state == gh->gh_state || gl->gl_state == LM_ST_UNLOCKED) {
586                 list_del_init(&gh->gh_list);
587                 gh->gh_error = 0;
588                 spin_unlock(&gl->gl_spin);
589                 gfs2_holder_dispose_or_wake(gh);
590                 spin_lock(&gl->gl_spin);
591         } else {
592                 gl->gl_req_gh = gh;
593                 set_bit(GLF_LOCK, &gl->gl_flags);
594                 spin_unlock(&gl->gl_spin);
595
596                 if (gh->gh_state == LM_ST_UNLOCKED ||
597                     gl->gl_state != LM_ST_EXCLUSIVE)
598                         glops->go_drop_th(gl);
599                 else
600                         glops->go_xmote_th(gl, gh->gh_state, gh->gh_flags);
601
602                 spin_lock(&gl->gl_spin);
603         }
604
605         return 0;
606 }
607
608 /**
609  * run_queue - process holder structures on a glock
610  * @gl: the glock
611  *
612  */
613 static void run_queue(struct gfs2_glock *gl)
614 {
615         struct gfs2_holder *gh;
616         int blocked = 1;
617
618         for (;;) {
619                 if (test_bit(GLF_LOCK, &gl->gl_flags))
620                         break;
621
622                 if (!list_empty(&gl->gl_waiters1)) {
623                         gh = list_entry(gl->gl_waiters1.next,
624                                         struct gfs2_holder, gh_list);
625
626                         if (test_bit(HIF_MUTEX, &gh->gh_iflags))
627                                 blocked = rq_mutex(gh);
628                         else
629                                 gfs2_assert_warn(gl->gl_sbd, 0);
630
631                 } else if (!list_empty(&gl->gl_waiters2) &&
632                            !test_bit(GLF_SKIP_WAITERS2, &gl->gl_flags)) {
633                         gh = list_entry(gl->gl_waiters2.next,
634                                         struct gfs2_holder, gh_list);
635
636                         if (test_bit(HIF_DEMOTE, &gh->gh_iflags))
637                                 blocked = rq_demote(gh);
638                         else
639                                 gfs2_assert_warn(gl->gl_sbd, 0);
640
641                 } else if (!list_empty(&gl->gl_waiters3)) {
642                         gh = list_entry(gl->gl_waiters3.next,
643                                         struct gfs2_holder, gh_list);
644
645                         if (test_bit(HIF_PROMOTE, &gh->gh_iflags))
646                                 blocked = rq_promote(gh);
647                         else
648                                 gfs2_assert_warn(gl->gl_sbd, 0);
649
650                 } else
651                         break;
652
653                 if (blocked)
654                         break;
655         }
656 }
657
658 /**
659  * gfs2_glmutex_lock - acquire a local lock on a glock
660  * @gl: the glock
661  *
662  * Gives caller exclusive access to manipulate a glock structure.
663  */
664
665 static void gfs2_glmutex_lock(struct gfs2_glock *gl)
666 {
667         struct gfs2_holder gh;
668
669         gfs2_holder_init(gl, 0, 0, &gh);
670         set_bit(HIF_MUTEX, &gh.gh_iflags);
671         if (test_and_set_bit(HIF_WAIT, &gh.gh_iflags))
672                 BUG();
673
674         spin_lock(&gl->gl_spin);
675         if (test_and_set_bit(GLF_LOCK, &gl->gl_flags)) {
676                 list_add_tail(&gh.gh_list, &gl->gl_waiters1);
677         } else {
678                 gl->gl_owner = current;
679                 gl->gl_ip = (unsigned long)__builtin_return_address(0);
680                 clear_bit(HIF_WAIT, &gh.gh_iflags);
681                 smp_mb();
682                 wake_up_bit(&gh.gh_iflags, HIF_WAIT);
683         }
684         spin_unlock(&gl->gl_spin);
685
686         wait_on_holder(&gh);
687         gfs2_holder_uninit(&gh);
688 }
689
690 /**
691  * gfs2_glmutex_trylock - try to acquire a local lock on a glock
692  * @gl: the glock
693  *
694  * Returns: 1 if the glock is acquired
695  */
696
697 static int gfs2_glmutex_trylock(struct gfs2_glock *gl)
698 {
699         int acquired = 1;
700
701         spin_lock(&gl->gl_spin);
702         if (test_and_set_bit(GLF_LOCK, &gl->gl_flags)) {
703                 acquired = 0;
704         } else {
705                 gl->gl_owner = current;
706                 gl->gl_ip = (unsigned long)__builtin_return_address(0);
707         }
708         spin_unlock(&gl->gl_spin);
709
710         return acquired;
711 }
712
713 /**
714  * gfs2_glmutex_unlock - release a local lock on a glock
715  * @gl: the glock
716  *
717  */
718
719 static void gfs2_glmutex_unlock(struct gfs2_glock *gl)
720 {
721         spin_lock(&gl->gl_spin);
722         clear_bit(GLF_LOCK, &gl->gl_flags);
723         gl->gl_owner = NULL;
724         gl->gl_ip = 0;
725         run_queue(gl);
726         BUG_ON(!spin_is_locked(&gl->gl_spin));
727         spin_unlock(&gl->gl_spin);
728 }
729
730 /**
731  * handle_callback - add a demote request to a lock's queue
732  * @gl: the glock
733  * @state: the state the caller wants us to change to
734  *
735  * Note: This may fail sliently if we are out of memory.
736  */
737
738 static void handle_callback(struct gfs2_glock *gl, unsigned int state)
739 {
740         struct gfs2_holder *gh, *new_gh = NULL;
741
742 restart:
743         spin_lock(&gl->gl_spin);
744
745         list_for_each_entry(gh, &gl->gl_waiters2, gh_list) {
746                 if (test_bit(HIF_DEMOTE, &gh->gh_iflags) &&
747                     gl->gl_req_gh != gh) {
748                         if (gh->gh_state != state)
749                                 gh->gh_state = LM_ST_UNLOCKED;
750                         goto out;
751                 }
752         }
753
754         if (new_gh) {
755                 list_add_tail(&new_gh->gh_list, &gl->gl_waiters2);
756                 new_gh = NULL;
757         } else {
758                 spin_unlock(&gl->gl_spin);
759
760                 new_gh = gfs2_holder_get(gl, state, LM_FLAG_TRY, GFP_NOFS);
761                 if (!new_gh)
762                         return;
763                 set_bit(HIF_DEMOTE, &new_gh->gh_iflags);
764                 set_bit(HIF_DEALLOC, &new_gh->gh_iflags);
765                 set_bit(HIF_WAIT, &new_gh->gh_iflags);
766
767                 goto restart;
768         }
769
770 out:
771         spin_unlock(&gl->gl_spin);
772
773         if (new_gh)
774                 gfs2_holder_put(new_gh);
775 }
776
777 /**
778  * state_change - record that the glock is now in a different state
779  * @gl: the glock
780  * @new_state the new state
781  *
782  */
783
784 static void state_change(struct gfs2_glock *gl, unsigned int new_state)
785 {
786         int held1, held2;
787
788         held1 = (gl->gl_state != LM_ST_UNLOCKED);
789         held2 = (new_state != LM_ST_UNLOCKED);
790
791         if (held1 != held2) {
792                 if (held2)
793                         gfs2_glock_hold(gl);
794                 else
795                         gfs2_glock_put(gl);
796         }
797
798         gl->gl_state = new_state;
799 }
800
801 /**
802  * xmote_bh - Called after the lock module is done acquiring a lock
803  * @gl: The glock in question
804  * @ret: the int returned from the lock module
805  *
806  */
807
808 static void xmote_bh(struct gfs2_glock *gl, unsigned int ret)
809 {
810         struct gfs2_sbd *sdp = gl->gl_sbd;
811         const struct gfs2_glock_operations *glops = gl->gl_ops;
812         struct gfs2_holder *gh = gl->gl_req_gh;
813         int prev_state = gl->gl_state;
814         int op_done = 1;
815
816         gfs2_assert_warn(sdp, test_bit(GLF_LOCK, &gl->gl_flags));
817         gfs2_assert_warn(sdp, queue_empty(gl, &gl->gl_holders));
818         gfs2_assert_warn(sdp, !(ret & LM_OUT_ASYNC));
819
820         state_change(gl, ret & LM_OUT_ST_MASK);
821
822         if (prev_state != LM_ST_UNLOCKED && !(ret & LM_OUT_CACHEABLE)) {
823                 if (glops->go_inval)
824                         glops->go_inval(gl, DIO_METADATA);
825         } else if (gl->gl_state == LM_ST_DEFERRED) {
826                 /* We might not want to do this here.
827                    Look at moving to the inode glops. */
828                 if (glops->go_inval)
829                         glops->go_inval(gl, 0);
830         }
831
832         /*  Deal with each possible exit condition  */
833
834         if (!gh)
835                 gl->gl_stamp = jiffies;
836         else if (unlikely(test_bit(SDF_SHUTDOWN, &sdp->sd_flags))) {
837                 spin_lock(&gl->gl_spin);
838                 list_del_init(&gh->gh_list);
839                 gh->gh_error = -EIO;
840                 spin_unlock(&gl->gl_spin);
841         } else if (test_bit(HIF_DEMOTE, &gh->gh_iflags)) {
842                 spin_lock(&gl->gl_spin);
843                 list_del_init(&gh->gh_list);
844                 if (gl->gl_state == gh->gh_state ||
845                     gl->gl_state == LM_ST_UNLOCKED) {
846                         gh->gh_error = 0;
847                 } else {
848                         if (gfs2_assert_warn(sdp, gh->gh_flags &
849                                         (LM_FLAG_TRY | LM_FLAG_TRY_1CB)) == -1)
850                                 fs_warn(sdp, "ret = 0x%.8X\n", ret);
851                         gh->gh_error = GLR_TRYFAILED;
852                 }
853                 spin_unlock(&gl->gl_spin);
854
855                 if (ret & LM_OUT_CANCELED)
856                         handle_callback(gl, LM_ST_UNLOCKED);
857
858         } else if (ret & LM_OUT_CANCELED) {
859                 spin_lock(&gl->gl_spin);
860                 list_del_init(&gh->gh_list);
861                 gh->gh_error = GLR_CANCELED;
862                 spin_unlock(&gl->gl_spin);
863
864         } else if (relaxed_state_ok(gl->gl_state, gh->gh_state, gh->gh_flags)) {
865                 spin_lock(&gl->gl_spin);
866                 list_move_tail(&gh->gh_list, &gl->gl_holders);
867                 gh->gh_error = 0;
868                 set_bit(HIF_HOLDER, &gh->gh_iflags);
869                 spin_unlock(&gl->gl_spin);
870
871                 set_bit(HIF_FIRST, &gh->gh_iflags);
872
873                 op_done = 0;
874
875         } else if (gh->gh_flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB)) {
876                 spin_lock(&gl->gl_spin);
877                 list_del_init(&gh->gh_list);
878                 gh->gh_error = GLR_TRYFAILED;
879                 spin_unlock(&gl->gl_spin);
880
881         } else {
882                 if (gfs2_assert_withdraw(sdp, 0) == -1)
883                         fs_err(sdp, "ret = 0x%.8X\n", ret);
884         }
885
886         if (glops->go_xmote_bh)
887                 glops->go_xmote_bh(gl);
888
889         if (op_done) {
890                 spin_lock(&gl->gl_spin);
891                 gl->gl_req_gh = NULL;
892                 gl->gl_req_bh = NULL;
893                 clear_bit(GLF_LOCK, &gl->gl_flags);
894                 run_queue(gl);
895                 spin_unlock(&gl->gl_spin);
896         }
897
898         gfs2_glock_put(gl);
899
900         if (gh)
901                 gfs2_holder_dispose_or_wake(gh);
902 }
903
904 /**
905  * gfs2_glock_xmote_th - Call into the lock module to acquire or change a glock
906  * @gl: The glock in question
907  * @state: the requested state
908  * @flags: modifier flags to the lock call
909  *
910  */
911
912 void gfs2_glock_xmote_th(struct gfs2_glock *gl, unsigned int state, int flags)
913 {
914         struct gfs2_sbd *sdp = gl->gl_sbd;
915         const struct gfs2_glock_operations *glops = gl->gl_ops;
916         int lck_flags = flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB |
917                                  LM_FLAG_NOEXP | LM_FLAG_ANY |
918                                  LM_FLAG_PRIORITY);
919         unsigned int lck_ret;
920
921         gfs2_assert_warn(sdp, test_bit(GLF_LOCK, &gl->gl_flags));
922         gfs2_assert_warn(sdp, queue_empty(gl, &gl->gl_holders));
923         gfs2_assert_warn(sdp, state != LM_ST_UNLOCKED);
924         gfs2_assert_warn(sdp, state != gl->gl_state);
925
926         if (gl->gl_state == LM_ST_EXCLUSIVE && glops->go_sync)
927                 glops->go_sync(gl);
928
929         gfs2_glock_hold(gl);
930         gl->gl_req_bh = xmote_bh;
931
932         lck_ret = gfs2_lm_lock(sdp, gl->gl_lock, gl->gl_state, state, lck_flags);
933
934         if (gfs2_assert_withdraw(sdp, !(lck_ret & LM_OUT_ERROR)))
935                 return;
936
937         if (lck_ret & LM_OUT_ASYNC)
938                 gfs2_assert_warn(sdp, lck_ret == LM_OUT_ASYNC);
939         else
940                 xmote_bh(gl, lck_ret);
941 }
942
943 /**
944  * drop_bh - Called after a lock module unlock completes
945  * @gl: the glock
946  * @ret: the return status
947  *
948  * Doesn't wake up the process waiting on the struct gfs2_holder (if any)
949  * Doesn't drop the reference on the glock the top half took out
950  *
951  */
952
953 static void drop_bh(struct gfs2_glock *gl, unsigned int ret)
954 {
955         struct gfs2_sbd *sdp = gl->gl_sbd;
956         const struct gfs2_glock_operations *glops = gl->gl_ops;
957         struct gfs2_holder *gh = gl->gl_req_gh;
958
959         gfs2_assert_warn(sdp, test_bit(GLF_LOCK, &gl->gl_flags));
960         gfs2_assert_warn(sdp, queue_empty(gl, &gl->gl_holders));
961         gfs2_assert_warn(sdp, !ret);
962
963         state_change(gl, LM_ST_UNLOCKED);
964
965         if (glops->go_inval)
966                 glops->go_inval(gl, DIO_METADATA);
967
968         if (gh) {
969                 spin_lock(&gl->gl_spin);
970                 list_del_init(&gh->gh_list);
971                 gh->gh_error = 0;
972                 spin_unlock(&gl->gl_spin);
973         }
974
975         if (glops->go_drop_bh)
976                 glops->go_drop_bh(gl);
977
978         spin_lock(&gl->gl_spin);
979         gl->gl_req_gh = NULL;
980         gl->gl_req_bh = NULL;
981         clear_bit(GLF_LOCK, &gl->gl_flags);
982         run_queue(gl);
983         spin_unlock(&gl->gl_spin);
984
985         gfs2_glock_put(gl);
986
987         if (gh)
988                 gfs2_holder_dispose_or_wake(gh);
989 }
990
991 /**
992  * gfs2_glock_drop_th - call into the lock module to unlock a lock
993  * @gl: the glock
994  *
995  */
996
997 void gfs2_glock_drop_th(struct gfs2_glock *gl)
998 {
999         struct gfs2_sbd *sdp = gl->gl_sbd;
1000         const struct gfs2_glock_operations *glops = gl->gl_ops;
1001         unsigned int ret;
1002
1003         gfs2_assert_warn(sdp, test_bit(GLF_LOCK, &gl->gl_flags));
1004         gfs2_assert_warn(sdp, queue_empty(gl, &gl->gl_holders));
1005         gfs2_assert_warn(sdp, gl->gl_state != LM_ST_UNLOCKED);
1006
1007         if (gl->gl_state == LM_ST_EXCLUSIVE && glops->go_sync)
1008                 glops->go_sync(gl);
1009
1010         gfs2_glock_hold(gl);
1011         gl->gl_req_bh = drop_bh;
1012
1013         ret = gfs2_lm_unlock(sdp, gl->gl_lock, gl->gl_state);
1014
1015         if (gfs2_assert_withdraw(sdp, !(ret & LM_OUT_ERROR)))
1016                 return;
1017
1018         if (!ret)
1019                 drop_bh(gl, ret);
1020         else
1021                 gfs2_assert_warn(sdp, ret == LM_OUT_ASYNC);
1022 }
1023
1024 /**
1025  * do_cancels - cancel requests for locks stuck waiting on an expire flag
1026  * @gh: the LM_FLAG_PRIORITY holder waiting to acquire the lock
1027  *
1028  * Don't cancel GL_NOCANCEL requests.
1029  */
1030
1031 static void do_cancels(struct gfs2_holder *gh)
1032 {
1033         struct gfs2_glock *gl = gh->gh_gl;
1034
1035         spin_lock(&gl->gl_spin);
1036
1037         while (gl->gl_req_gh != gh &&
1038                !test_bit(HIF_HOLDER, &gh->gh_iflags) &&
1039                !list_empty(&gh->gh_list)) {
1040                 if (gl->gl_req_bh && !(gl->gl_req_gh &&
1041                                      (gl->gl_req_gh->gh_flags & GL_NOCANCEL))) {
1042                         spin_unlock(&gl->gl_spin);
1043                         gfs2_lm_cancel(gl->gl_sbd, gl->gl_lock);
1044                         msleep(100);
1045                         spin_lock(&gl->gl_spin);
1046                 } else {
1047                         spin_unlock(&gl->gl_spin);
1048                         msleep(100);
1049                         spin_lock(&gl->gl_spin);
1050                 }
1051         }
1052
1053         spin_unlock(&gl->gl_spin);
1054 }
1055
1056 /**
1057  * glock_wait_internal - wait on a glock acquisition
1058  * @gh: the glock holder
1059  *
1060  * Returns: 0 on success
1061  */
1062
1063 static int glock_wait_internal(struct gfs2_holder *gh)
1064 {
1065         struct gfs2_glock *gl = gh->gh_gl;
1066         struct gfs2_sbd *sdp = gl->gl_sbd;
1067         const struct gfs2_glock_operations *glops = gl->gl_ops;
1068
1069         if (test_bit(HIF_ABORTED, &gh->gh_iflags))
1070                 return -EIO;
1071
1072         if (gh->gh_flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB)) {
1073                 spin_lock(&gl->gl_spin);
1074                 if (gl->gl_req_gh != gh &&
1075                     !test_bit(HIF_HOLDER, &gh->gh_iflags) &&
1076                     !list_empty(&gh->gh_list)) {
1077                         list_del_init(&gh->gh_list);
1078                         gh->gh_error = GLR_TRYFAILED;
1079                         run_queue(gl);
1080                         spin_unlock(&gl->gl_spin);
1081                         return gh->gh_error;
1082                 }
1083                 spin_unlock(&gl->gl_spin);
1084         }
1085
1086         if (gh->gh_flags & LM_FLAG_PRIORITY)
1087                 do_cancels(gh);
1088
1089         wait_on_holder(gh);
1090         if (gh->gh_error)
1091                 return gh->gh_error;
1092
1093         gfs2_assert_withdraw(sdp, test_bit(HIF_HOLDER, &gh->gh_iflags));
1094         gfs2_assert_withdraw(sdp, relaxed_state_ok(gl->gl_state, gh->gh_state,
1095                                                    gh->gh_flags));
1096
1097         if (test_bit(HIF_FIRST, &gh->gh_iflags)) {
1098                 gfs2_assert_warn(sdp, test_bit(GLF_LOCK, &gl->gl_flags));
1099
1100                 if (glops->go_lock) {
1101                         gh->gh_error = glops->go_lock(gh);
1102                         if (gh->gh_error) {
1103                                 spin_lock(&gl->gl_spin);
1104                                 list_del_init(&gh->gh_list);
1105                                 spin_unlock(&gl->gl_spin);
1106                         }
1107                 }
1108
1109                 spin_lock(&gl->gl_spin);
1110                 gl->gl_req_gh = NULL;
1111                 gl->gl_req_bh = NULL;
1112                 clear_bit(GLF_LOCK, &gl->gl_flags);
1113                 run_queue(gl);
1114                 spin_unlock(&gl->gl_spin);
1115         }
1116
1117         return gh->gh_error;
1118 }
1119
1120 static inline struct gfs2_holder *
1121 find_holder_by_owner(struct list_head *head, struct task_struct *owner)
1122 {
1123         struct gfs2_holder *gh;
1124
1125         list_for_each_entry(gh, head, gh_list) {
1126                 if (gh->gh_owner == owner)
1127                         return gh;
1128         }
1129
1130         return NULL;
1131 }
1132
1133 /**
1134  * add_to_queue - Add a holder to the wait queue (but look for recursion)
1135  * @gh: the holder structure to add
1136  *
1137  */
1138
1139 static void add_to_queue(struct gfs2_holder *gh)
1140 {
1141         struct gfs2_glock *gl = gh->gh_gl;
1142         struct gfs2_holder *existing;
1143
1144         BUG_ON(!gh->gh_owner);
1145         if (test_and_set_bit(HIF_WAIT, &gh->gh_iflags))
1146                 BUG();
1147
1148         existing = find_holder_by_owner(&gl->gl_holders, gh->gh_owner);
1149         if (existing) {
1150                 print_symbol(KERN_WARNING "original: %s\n", existing->gh_ip);
1151                 printk(KERN_INFO "pid : %d\n", existing->gh_owner->pid);
1152                 printk(KERN_INFO "lock type : %d lock state : %d\n",
1153                                 existing->gh_gl->gl_name.ln_type, existing->gh_gl->gl_state);
1154                 print_symbol(KERN_WARNING "new: %s\n", gh->gh_ip);
1155                 printk(KERN_INFO "pid : %d\n", gh->gh_owner->pid);
1156                 printk(KERN_INFO "lock type : %d lock state : %d\n",
1157                                 gl->gl_name.ln_type, gl->gl_state);
1158                 BUG();
1159         }
1160
1161         existing = find_holder_by_owner(&gl->gl_waiters3, gh->gh_owner);
1162         if (existing) {
1163                 print_symbol(KERN_WARNING "original: %s\n", existing->gh_ip);
1164                 print_symbol(KERN_WARNING "new: %s\n", gh->gh_ip);
1165                 BUG();
1166         }
1167
1168         if (gh->gh_flags & LM_FLAG_PRIORITY)
1169                 list_add(&gh->gh_list, &gl->gl_waiters3);
1170         else
1171                 list_add_tail(&gh->gh_list, &gl->gl_waiters3);
1172 }
1173
1174 /**
1175  * gfs2_glock_nq - enqueue a struct gfs2_holder onto a glock (acquire a glock)
1176  * @gh: the holder structure
1177  *
1178  * if (gh->gh_flags & GL_ASYNC), this never returns an error
1179  *
1180  * Returns: 0, GLR_TRYFAILED, or errno on failure
1181  */
1182
1183 int gfs2_glock_nq(struct gfs2_holder *gh)
1184 {
1185         struct gfs2_glock *gl = gh->gh_gl;
1186         struct gfs2_sbd *sdp = gl->gl_sbd;
1187         int error = 0;
1188
1189 restart:
1190         if (unlikely(test_bit(SDF_SHUTDOWN, &sdp->sd_flags))) {
1191                 set_bit(HIF_ABORTED, &gh->gh_iflags);
1192                 return -EIO;
1193         }
1194
1195         set_bit(HIF_PROMOTE, &gh->gh_iflags);
1196
1197         spin_lock(&gl->gl_spin);
1198         add_to_queue(gh);
1199         run_queue(gl);
1200         spin_unlock(&gl->gl_spin);
1201
1202         if (!(gh->gh_flags & GL_ASYNC)) {
1203                 error = glock_wait_internal(gh);
1204                 if (error == GLR_CANCELED) {
1205                         msleep(100);
1206                         goto restart;
1207                 }
1208         }
1209
1210         return error;
1211 }
1212
1213 /**
1214  * gfs2_glock_poll - poll to see if an async request has been completed
1215  * @gh: the holder
1216  *
1217  * Returns: 1 if the request is ready to be gfs2_glock_wait()ed on
1218  */
1219
1220 int gfs2_glock_poll(struct gfs2_holder *gh)
1221 {
1222         struct gfs2_glock *gl = gh->gh_gl;
1223         int ready = 0;
1224
1225         spin_lock(&gl->gl_spin);
1226
1227         if (test_bit(HIF_HOLDER, &gh->gh_iflags))
1228                 ready = 1;
1229         else if (list_empty(&gh->gh_list)) {
1230                 if (gh->gh_error == GLR_CANCELED) {
1231                         spin_unlock(&gl->gl_spin);
1232                         msleep(100);
1233                         if (gfs2_glock_nq(gh))
1234                                 return 1;
1235                         return 0;
1236                 } else
1237                         ready = 1;
1238         }
1239
1240         spin_unlock(&gl->gl_spin);
1241
1242         return ready;
1243 }
1244
1245 /**
1246  * gfs2_glock_wait - wait for a lock acquisition that ended in a GLR_ASYNC
1247  * @gh: the holder structure
1248  *
1249  * Returns: 0, GLR_TRYFAILED, or errno on failure
1250  */
1251
1252 int gfs2_glock_wait(struct gfs2_holder *gh)
1253 {
1254         int error;
1255
1256         error = glock_wait_internal(gh);
1257         if (error == GLR_CANCELED) {
1258                 msleep(100);
1259                 gh->gh_flags &= ~GL_ASYNC;
1260                 error = gfs2_glock_nq(gh);
1261         }
1262
1263         return error;
1264 }
1265
1266 /**
1267  * gfs2_glock_dq - dequeue a struct gfs2_holder from a glock (release a glock)
1268  * @gh: the glock holder
1269  *
1270  */
1271
1272 void gfs2_glock_dq(struct gfs2_holder *gh)
1273 {
1274         struct gfs2_glock *gl = gh->gh_gl;
1275         const struct gfs2_glock_operations *glops = gl->gl_ops;
1276
1277         if (gh->gh_flags & GL_NOCACHE)
1278                 handle_callback(gl, LM_ST_UNLOCKED);
1279
1280         gfs2_glmutex_lock(gl);
1281
1282         spin_lock(&gl->gl_spin);
1283         list_del_init(&gh->gh_list);
1284
1285         if (list_empty(&gl->gl_holders)) {
1286                 spin_unlock(&gl->gl_spin);
1287
1288                 if (glops->go_unlock)
1289                         glops->go_unlock(gh);
1290
1291                 gl->gl_stamp = jiffies;
1292
1293                 spin_lock(&gl->gl_spin);
1294         }
1295
1296         clear_bit(GLF_LOCK, &gl->gl_flags);
1297         run_queue(gl);
1298         spin_unlock(&gl->gl_spin);
1299 }
1300
1301 /**
1302  * gfs2_glock_dq_uninit - dequeue a holder from a glock and initialize it
1303  * @gh: the holder structure
1304  *
1305  */
1306
1307 void gfs2_glock_dq_uninit(struct gfs2_holder *gh)
1308 {
1309         gfs2_glock_dq(gh);
1310         gfs2_holder_uninit(gh);
1311 }
1312
1313 /**
1314  * gfs2_glock_nq_num - acquire a glock based on lock number
1315  * @sdp: the filesystem
1316  * @number: the lock number
1317  * @glops: the glock operations for the type of glock
1318  * @state: the state to acquire the glock in
1319  * @flags: modifier flags for the aquisition
1320  * @gh: the struct gfs2_holder
1321  *
1322  * Returns: errno
1323  */
1324
1325 int gfs2_glock_nq_num(struct gfs2_sbd *sdp, u64 number,
1326                       const struct gfs2_glock_operations *glops,
1327                       unsigned int state, int flags, struct gfs2_holder *gh)
1328 {
1329         struct gfs2_glock *gl;
1330         int error;
1331
1332         error = gfs2_glock_get(sdp, number, glops, CREATE, &gl);
1333         if (!error) {
1334                 error = gfs2_glock_nq_init(gl, state, flags, gh);
1335                 gfs2_glock_put(gl);
1336         }
1337
1338         return error;
1339 }
1340
1341 /**
1342  * glock_compare - Compare two struct gfs2_glock structures for sorting
1343  * @arg_a: the first structure
1344  * @arg_b: the second structure
1345  *
1346  */
1347
1348 static int glock_compare(const void *arg_a, const void *arg_b)
1349 {
1350         const struct gfs2_holder *gh_a = *(const struct gfs2_holder **)arg_a;
1351         const struct gfs2_holder *gh_b = *(const struct gfs2_holder **)arg_b;
1352         const struct lm_lockname *a = &gh_a->gh_gl->gl_name;
1353         const struct lm_lockname *b = &gh_b->gh_gl->gl_name;
1354
1355         if (a->ln_number > b->ln_number)
1356                 return 1;
1357         if (a->ln_number < b->ln_number)
1358                 return -1;
1359         BUG_ON(gh_a->gh_gl->gl_ops->go_type == gh_b->gh_gl->gl_ops->go_type);
1360         return 0;
1361 }
1362
1363 /**
1364  * nq_m_sync - synchonously acquire more than one glock in deadlock free order
1365  * @num_gh: the number of structures
1366  * @ghs: an array of struct gfs2_holder structures
1367  *
1368  * Returns: 0 on success (all glocks acquired),
1369  *          errno on failure (no glocks acquired)
1370  */
1371
1372 static int nq_m_sync(unsigned int num_gh, struct gfs2_holder *ghs,
1373                      struct gfs2_holder **p)
1374 {
1375         unsigned int x;
1376         int error = 0;
1377
1378         for (x = 0; x < num_gh; x++)
1379                 p[x] = &ghs[x];
1380
1381         sort(p, num_gh, sizeof(struct gfs2_holder *), glock_compare, NULL);
1382
1383         for (x = 0; x < num_gh; x++) {
1384                 p[x]->gh_flags &= ~(LM_FLAG_TRY | GL_ASYNC);
1385
1386                 error = gfs2_glock_nq(p[x]);
1387                 if (error) {
1388                         while (x--)
1389                                 gfs2_glock_dq(p[x]);
1390                         break;
1391                 }
1392         }
1393
1394         return error;
1395 }
1396
1397 /**
1398  * gfs2_glock_nq_m - acquire multiple glocks
1399  * @num_gh: the number of structures
1400  * @ghs: an array of struct gfs2_holder structures
1401  *
1402  * Figure out how big an impact this function has.  Either:
1403  * 1) Replace this code with code that calls gfs2_glock_prefetch()
1404  * 2) Forget async stuff and just call nq_m_sync()
1405  * 3) Leave it like it is
1406  *
1407  * Returns: 0 on success (all glocks acquired),
1408  *          errno on failure (no glocks acquired)
1409  */
1410
1411 int gfs2_glock_nq_m(unsigned int num_gh, struct gfs2_holder *ghs)
1412 {
1413         int *e;
1414         unsigned int x;
1415         int borked = 0, serious = 0;
1416         int error = 0;
1417
1418         if (!num_gh)
1419                 return 0;
1420
1421         if (num_gh == 1) {
1422                 ghs->gh_flags &= ~(LM_FLAG_TRY | GL_ASYNC);
1423                 return gfs2_glock_nq(ghs);
1424         }
1425
1426         e = kcalloc(num_gh, sizeof(struct gfs2_holder *), GFP_KERNEL);
1427         if (!e)
1428                 return -ENOMEM;
1429
1430         for (x = 0; x < num_gh; x++) {
1431                 ghs[x].gh_flags |= LM_FLAG_TRY | GL_ASYNC;
1432                 error = gfs2_glock_nq(&ghs[x]);
1433                 if (error) {
1434                         borked = 1;
1435                         serious = error;
1436                         num_gh = x;
1437                         break;
1438                 }
1439         }
1440
1441         for (x = 0; x < num_gh; x++) {
1442                 error = e[x] = glock_wait_internal(&ghs[x]);
1443                 if (error) {
1444                         borked = 1;
1445                         if (error != GLR_TRYFAILED && error != GLR_CANCELED)
1446                                 serious = error;
1447                 }
1448         }
1449
1450         if (!borked) {
1451                 kfree(e);
1452                 return 0;
1453         }
1454
1455         for (x = 0; x < num_gh; x++)
1456                 if (!e[x])
1457                         gfs2_glock_dq(&ghs[x]);
1458
1459         if (serious)
1460                 error = serious;
1461         else {
1462                 for (x = 0; x < num_gh; x++)
1463                         gfs2_holder_reinit(ghs[x].gh_state, ghs[x].gh_flags,
1464                                           &ghs[x]);
1465                 error = nq_m_sync(num_gh, ghs, (struct gfs2_holder **)e);
1466         }
1467
1468         kfree(e);
1469
1470         return error;
1471 }
1472
1473 /**
1474  * gfs2_glock_dq_m - release multiple glocks
1475  * @num_gh: the number of structures
1476  * @ghs: an array of struct gfs2_holder structures
1477  *
1478  */
1479
1480 void gfs2_glock_dq_m(unsigned int num_gh, struct gfs2_holder *ghs)
1481 {
1482         unsigned int x;
1483
1484         for (x = 0; x < num_gh; x++)
1485                 gfs2_glock_dq(&ghs[x]);
1486 }
1487
1488 /**
1489  * gfs2_glock_dq_uninit_m - release multiple glocks
1490  * @num_gh: the number of structures
1491  * @ghs: an array of struct gfs2_holder structures
1492  *
1493  */
1494
1495 void gfs2_glock_dq_uninit_m(unsigned int num_gh, struct gfs2_holder *ghs)
1496 {
1497         unsigned int x;
1498
1499         for (x = 0; x < num_gh; x++)
1500                 gfs2_glock_dq_uninit(&ghs[x]);
1501 }
1502
1503 /**
1504  * gfs2_lvb_hold - attach a LVB from a glock
1505  * @gl: The glock in question
1506  *
1507  */
1508
1509 int gfs2_lvb_hold(struct gfs2_glock *gl)
1510 {
1511         int error;
1512
1513         gfs2_glmutex_lock(gl);
1514
1515         if (!atomic_read(&gl->gl_lvb_count)) {
1516                 error = gfs2_lm_hold_lvb(gl->gl_sbd, gl->gl_lock, &gl->gl_lvb);
1517                 if (error) {
1518                         gfs2_glmutex_unlock(gl);
1519                         return error;
1520                 }
1521                 gfs2_glock_hold(gl);
1522         }
1523         atomic_inc(&gl->gl_lvb_count);
1524
1525         gfs2_glmutex_unlock(gl);
1526
1527         return 0;
1528 }
1529
1530 /**
1531  * gfs2_lvb_unhold - detach a LVB from a glock
1532  * @gl: The glock in question
1533  *
1534  */
1535
1536 void gfs2_lvb_unhold(struct gfs2_glock *gl)
1537 {
1538         gfs2_glock_hold(gl);
1539         gfs2_glmutex_lock(gl);
1540
1541         gfs2_assert(gl->gl_sbd, atomic_read(&gl->gl_lvb_count) > 0);
1542         if (atomic_dec_and_test(&gl->gl_lvb_count)) {
1543                 gfs2_lm_unhold_lvb(gl->gl_sbd, gl->gl_lock, gl->gl_lvb);
1544                 gl->gl_lvb = NULL;
1545                 gfs2_glock_put(gl);
1546         }
1547
1548         gfs2_glmutex_unlock(gl);
1549         gfs2_glock_put(gl);
1550 }
1551
1552 static void blocking_cb(struct gfs2_sbd *sdp, struct lm_lockname *name,
1553                         unsigned int state)
1554 {
1555         struct gfs2_glock *gl;
1556
1557         gl = gfs2_glock_find(sdp, name);
1558         if (!gl)
1559                 return;
1560
1561         handle_callback(gl, state);
1562
1563         spin_lock(&gl->gl_spin);
1564         run_queue(gl);
1565         spin_unlock(&gl->gl_spin);
1566
1567         gfs2_glock_put(gl);
1568 }
1569
1570 /**
1571  * gfs2_glock_cb - Callback used by locking module
1572  * @sdp: Pointer to the superblock
1573  * @type: Type of callback
1574  * @data: Type dependent data pointer
1575  *
1576  * Called by the locking module when it wants to tell us something.
1577  * Either we need to drop a lock, one of our ASYNC requests completed, or
1578  * a journal from another client needs to be recovered.
1579  */
1580
1581 void gfs2_glock_cb(void *cb_data, unsigned int type, void *data)
1582 {
1583         struct gfs2_sbd *sdp = cb_data;
1584
1585         switch (type) {
1586         case LM_CB_NEED_E:
1587                 blocking_cb(sdp, data, LM_ST_UNLOCKED);
1588                 return;
1589
1590         case LM_CB_NEED_D:
1591                 blocking_cb(sdp, data, LM_ST_DEFERRED);
1592                 return;
1593
1594         case LM_CB_NEED_S:
1595                 blocking_cb(sdp, data, LM_ST_SHARED);
1596                 return;
1597
1598         case LM_CB_ASYNC: {
1599                 struct lm_async_cb *async = data;
1600                 struct gfs2_glock *gl;
1601
1602                 gl = gfs2_glock_find(sdp, &async->lc_name);
1603                 if (gfs2_assert_warn(sdp, gl))
1604                         return;
1605                 if (!gfs2_assert_warn(sdp, gl->gl_req_bh))
1606                         gl->gl_req_bh(gl, async->lc_ret);
1607                 gfs2_glock_put(gl);
1608                 return;
1609         }
1610
1611         case LM_CB_NEED_RECOVERY:
1612                 gfs2_jdesc_make_dirty(sdp, *(unsigned int *)data);
1613                 if (sdp->sd_recoverd_process)
1614                         wake_up_process(sdp->sd_recoverd_process);
1615                 return;
1616
1617         case LM_CB_DROPLOCKS:
1618                 gfs2_gl_hash_clear(sdp, NO_WAIT);
1619                 gfs2_quota_scan(sdp);
1620                 return;
1621
1622         default:
1623                 gfs2_assert_warn(sdp, 0);
1624                 return;
1625         }
1626 }
1627
1628 /**
1629  * demote_ok - Check to see if it's ok to unlock a glock
1630  * @gl: the glock
1631  *
1632  * Returns: 1 if it's ok
1633  */
1634
1635 static int demote_ok(struct gfs2_glock *gl)
1636 {
1637         const struct gfs2_glock_operations *glops = gl->gl_ops;
1638         int demote = 1;
1639
1640         if (test_bit(GLF_STICKY, &gl->gl_flags))
1641                 demote = 0;
1642         else if (glops->go_demote_ok)
1643                 demote = glops->go_demote_ok(gl);
1644
1645         return demote;
1646 }
1647
1648 /**
1649  * gfs2_glock_schedule_for_reclaim - Add a glock to the reclaim list
1650  * @gl: the glock
1651  *
1652  */
1653
1654 void gfs2_glock_schedule_for_reclaim(struct gfs2_glock *gl)
1655 {
1656         struct gfs2_sbd *sdp = gl->gl_sbd;
1657
1658         spin_lock(&sdp->sd_reclaim_lock);
1659         if (list_empty(&gl->gl_reclaim)) {
1660                 gfs2_glock_hold(gl);
1661                 list_add(&gl->gl_reclaim, &sdp->sd_reclaim_list);
1662                 atomic_inc(&sdp->sd_reclaim_count);
1663         }
1664         spin_unlock(&sdp->sd_reclaim_lock);
1665
1666         wake_up(&sdp->sd_reclaim_wq);
1667 }
1668
1669 /**
1670  * gfs2_reclaim_glock - process the next glock on the filesystem's reclaim list
1671  * @sdp: the filesystem
1672  *
1673  * Called from gfs2_glockd() glock reclaim daemon, or when promoting a
1674  * different glock and we notice that there are a lot of glocks in the
1675  * reclaim list.
1676  *
1677  */
1678
1679 void gfs2_reclaim_glock(struct gfs2_sbd *sdp)
1680 {
1681         struct gfs2_glock *gl;
1682
1683         spin_lock(&sdp->sd_reclaim_lock);
1684         if (list_empty(&sdp->sd_reclaim_list)) {
1685                 spin_unlock(&sdp->sd_reclaim_lock);
1686                 return;
1687         }
1688         gl = list_entry(sdp->sd_reclaim_list.next,
1689                         struct gfs2_glock, gl_reclaim);
1690         list_del_init(&gl->gl_reclaim);
1691         spin_unlock(&sdp->sd_reclaim_lock);
1692
1693         atomic_dec(&sdp->sd_reclaim_count);
1694         atomic_inc(&sdp->sd_reclaimed);
1695
1696         if (gfs2_glmutex_trylock(gl)) {
1697                 if (queue_empty(gl, &gl->gl_holders) &&
1698                     gl->gl_state != LM_ST_UNLOCKED && demote_ok(gl))
1699                         handle_callback(gl, LM_ST_UNLOCKED);
1700                 gfs2_glmutex_unlock(gl);
1701         }
1702
1703         gfs2_glock_put(gl);
1704 }
1705
1706 /**
1707  * examine_bucket - Call a function for glock in a hash bucket
1708  * @examiner: the function
1709  * @sdp: the filesystem
1710  * @bucket: the bucket
1711  *
1712  * Returns: 1 if the bucket has entries
1713  */
1714
1715 static int examine_bucket(glock_examiner examiner, struct gfs2_sbd *sdp,
1716                           unsigned int hash)
1717 {
1718         struct gfs2_glock *gl, *prev = NULL;
1719         int has_entries = 0;
1720         struct hlist_head *head = &gl_hash_table[hash].hb_list;
1721
1722         read_lock(gl_lock_addr(hash));
1723         /* Can't use hlist_for_each_entry - don't want prefetch here */
1724         if (hlist_empty(head))
1725                 goto out;
1726         gl = list_entry(head->first, struct gfs2_glock, gl_list);
1727         while(1) {
1728                 if (gl->gl_sbd == sdp) {
1729                         gfs2_glock_hold(gl);
1730                         read_unlock(gl_lock_addr(hash));
1731                         if (prev)
1732                                 gfs2_glock_put(prev);
1733                         prev = gl;
1734                         examiner(gl);
1735                         has_entries = 1;
1736                         read_lock(gl_lock_addr(hash));
1737                 }
1738                 if (gl->gl_list.next == NULL)
1739                         break;
1740                 gl = list_entry(gl->gl_list.next, struct gfs2_glock, gl_list);
1741         }
1742 out:
1743         read_unlock(gl_lock_addr(hash));
1744         if (prev)
1745                 gfs2_glock_put(prev);
1746         return has_entries;
1747 }
1748
1749 /**
1750  * scan_glock - look at a glock and see if we can reclaim it
1751  * @gl: the glock to look at
1752  *
1753  */
1754
1755 static void scan_glock(struct gfs2_glock *gl)
1756 {
1757         if (gl->gl_ops == &gfs2_inode_glops && gl->gl_object)
1758                 return;
1759
1760         if (gfs2_glmutex_trylock(gl)) {
1761                 if (queue_empty(gl, &gl->gl_holders) &&
1762                     gl->gl_state != LM_ST_UNLOCKED && demote_ok(gl))
1763                         goto out_schedule;
1764                 gfs2_glmutex_unlock(gl);
1765         }
1766         return;
1767
1768 out_schedule:
1769         gfs2_glmutex_unlock(gl);
1770         gfs2_glock_schedule_for_reclaim(gl);
1771 }
1772
1773 /**
1774  * gfs2_scand_internal - Look for glocks and inodes to toss from memory
1775  * @sdp: the filesystem
1776  *
1777  */
1778
1779 void gfs2_scand_internal(struct gfs2_sbd *sdp)
1780 {
1781         unsigned int x;
1782
1783         for (x = 0; x < GFS2_GL_HASH_SIZE; x++)
1784                 examine_bucket(scan_glock, sdp, x);
1785 }
1786
1787 /**
1788  * clear_glock - look at a glock and see if we can free it from glock cache
1789  * @gl: the glock to look at
1790  *
1791  */
1792
1793 static void clear_glock(struct gfs2_glock *gl)
1794 {
1795         struct gfs2_sbd *sdp = gl->gl_sbd;
1796         int released;
1797
1798         spin_lock(&sdp->sd_reclaim_lock);
1799         if (!list_empty(&gl->gl_reclaim)) {
1800                 list_del_init(&gl->gl_reclaim);
1801                 atomic_dec(&sdp->sd_reclaim_count);
1802                 spin_unlock(&sdp->sd_reclaim_lock);
1803                 released = gfs2_glock_put(gl);
1804                 gfs2_assert(sdp, !released);
1805         } else {
1806                 spin_unlock(&sdp->sd_reclaim_lock);
1807         }
1808
1809         if (gfs2_glmutex_trylock(gl)) {
1810                 if (queue_empty(gl, &gl->gl_holders) &&
1811                     gl->gl_state != LM_ST_UNLOCKED)
1812                         handle_callback(gl, LM_ST_UNLOCKED);
1813                 gfs2_glmutex_unlock(gl);
1814         }
1815 }
1816
1817 /**
1818  * gfs2_gl_hash_clear - Empty out the glock hash table
1819  * @sdp: the filesystem
1820  * @wait: wait until it's all gone
1821  *
1822  * Called when unmounting the filesystem, or when inter-node lock manager
1823  * requests DROPLOCKS because it is running out of capacity.
1824  */
1825
1826 void gfs2_gl_hash_clear(struct gfs2_sbd *sdp, int wait)
1827 {
1828         unsigned long t;
1829         unsigned int x;
1830         int cont;
1831
1832         t = jiffies;
1833
1834         for (;;) {
1835                 cont = 0;
1836                 for (x = 0; x < GFS2_GL_HASH_SIZE; x++) {
1837                         if (examine_bucket(clear_glock, sdp, x))
1838                                 cont = 1;
1839                 }
1840
1841                 if (!wait || !cont)
1842                         break;
1843
1844                 if (time_after_eq(jiffies,
1845                                   t + gfs2_tune_get(sdp, gt_stall_secs) * HZ)) {
1846                         fs_warn(sdp, "Unmount seems to be stalled. "
1847                                      "Dumping lock state...\n");
1848                         gfs2_dump_lockstate(sdp);
1849                         t = jiffies;
1850                 }
1851
1852                 invalidate_inodes(sdp->sd_vfs);
1853                 msleep(10);
1854         }
1855 }
1856
1857 /*
1858  *  Diagnostic routines to help debug distributed deadlock
1859  */
1860
1861 /**
1862  * dump_holder - print information about a glock holder
1863  * @str: a string naming the type of holder
1864  * @gh: the glock holder
1865  *
1866  * Returns: 0 on success, -ENOBUFS when we run out of space
1867  */
1868
1869 static int dump_holder(char *str, struct gfs2_holder *gh)
1870 {
1871         unsigned int x;
1872         int error = -ENOBUFS;
1873
1874         printk(KERN_INFO "  %s\n", str);
1875         printk(KERN_INFO "    owner = %ld\n",
1876                    (gh->gh_owner) ? (long)gh->gh_owner->pid : -1);
1877         printk(KERN_INFO "    gh_state = %u\n", gh->gh_state);
1878         printk(KERN_INFO "    gh_flags =");
1879         for (x = 0; x < 32; x++)
1880                 if (gh->gh_flags & (1 << x))
1881                         printk(" %u", x);
1882         printk(" \n");
1883         printk(KERN_INFO "    error = %d\n", gh->gh_error);
1884         printk(KERN_INFO "    gh_iflags =");
1885         for (x = 0; x < 32; x++)
1886                 if (test_bit(x, &gh->gh_iflags))
1887                         printk(" %u", x);
1888         printk(" \n");
1889         print_symbol(KERN_INFO "    initialized at: %s\n", gh->gh_ip);
1890
1891         error = 0;
1892
1893         return error;
1894 }
1895
1896 /**
1897  * dump_inode - print information about an inode
1898  * @ip: the inode
1899  *
1900  * Returns: 0 on success, -ENOBUFS when we run out of space
1901  */
1902
1903 static int dump_inode(struct gfs2_inode *ip)
1904 {
1905         unsigned int x;
1906         int error = -ENOBUFS;
1907
1908         printk(KERN_INFO "  Inode:\n");
1909         printk(KERN_INFO "    num = %llu %llu\n",
1910                     (unsigned long long)ip->i_num.no_formal_ino,
1911                     (unsigned long long)ip->i_num.no_addr);
1912         printk(KERN_INFO "    type = %u\n", IF2DT(ip->i_inode.i_mode));
1913         printk(KERN_INFO "    i_flags =");
1914         for (x = 0; x < 32; x++)
1915                 if (test_bit(x, &ip->i_flags))
1916                         printk(" %u", x);
1917         printk(" \n");
1918
1919         error = 0;
1920
1921         return error;
1922 }
1923
1924 /**
1925  * dump_glock - print information about a glock
1926  * @gl: the glock
1927  * @count: where we are in the buffer
1928  *
1929  * Returns: 0 on success, -ENOBUFS when we run out of space
1930  */
1931
1932 static int dump_glock(struct gfs2_glock *gl)
1933 {
1934         struct gfs2_holder *gh;
1935         unsigned int x;
1936         int error = -ENOBUFS;
1937
1938         spin_lock(&gl->gl_spin);
1939
1940         printk(KERN_INFO "Glock 0x%p (%u, %llu)\n", gl, gl->gl_name.ln_type,
1941                (unsigned long long)gl->gl_name.ln_number);
1942         printk(KERN_INFO "  gl_flags =");
1943         for (x = 0; x < 32; x++) {
1944                 if (test_bit(x, &gl->gl_flags))
1945                         printk(" %u", x);
1946         }
1947         printk(" \n");
1948         printk(KERN_INFO "  gl_ref = %d\n", atomic_read(&gl->gl_ref));
1949         printk(KERN_INFO "  gl_state = %u\n", gl->gl_state);
1950         printk(KERN_INFO "  gl_owner = %s\n", gl->gl_owner->comm);
1951         print_symbol(KERN_INFO "  gl_ip = %s\n", gl->gl_ip);
1952         printk(KERN_INFO "  req_gh = %s\n", (gl->gl_req_gh) ? "yes" : "no");
1953         printk(KERN_INFO "  req_bh = %s\n", (gl->gl_req_bh) ? "yes" : "no");
1954         printk(KERN_INFO "  lvb_count = %d\n", atomic_read(&gl->gl_lvb_count));
1955         printk(KERN_INFO "  object = %s\n", (gl->gl_object) ? "yes" : "no");
1956         printk(KERN_INFO "  le = %s\n",
1957                    (list_empty(&gl->gl_le.le_list)) ? "no" : "yes");
1958         printk(KERN_INFO "  reclaim = %s\n",
1959                     (list_empty(&gl->gl_reclaim)) ? "no" : "yes");
1960         if (gl->gl_aspace)
1961                 printk(KERN_INFO "  aspace = 0x%p nrpages = %lu\n", gl->gl_aspace,
1962                        gl->gl_aspace->i_mapping->nrpages);
1963         else
1964                 printk(KERN_INFO "  aspace = no\n");
1965         printk(KERN_INFO "  ail = %d\n", atomic_read(&gl->gl_ail_count));
1966         if (gl->gl_req_gh) {
1967                 error = dump_holder("Request", gl->gl_req_gh);
1968                 if (error)
1969                         goto out;
1970         }
1971         list_for_each_entry(gh, &gl->gl_holders, gh_list) {
1972                 error = dump_holder("Holder", gh);
1973                 if (error)
1974                         goto out;
1975         }
1976         list_for_each_entry(gh, &gl->gl_waiters1, gh_list) {
1977                 error = dump_holder("Waiter1", gh);
1978                 if (error)
1979                         goto out;
1980         }
1981         list_for_each_entry(gh, &gl->gl_waiters2, gh_list) {
1982                 error = dump_holder("Waiter2", gh);
1983                 if (error)
1984                         goto out;
1985         }
1986         list_for_each_entry(gh, &gl->gl_waiters3, gh_list) {
1987                 error = dump_holder("Waiter3", gh);
1988                 if (error)
1989                         goto out;
1990         }
1991         if (gl->gl_ops == &gfs2_inode_glops && gl->gl_object) {
1992                 if (!test_bit(GLF_LOCK, &gl->gl_flags) &&
1993                     list_empty(&gl->gl_holders)) {
1994                         error = dump_inode(gl->gl_object);
1995                         if (error)
1996                                 goto out;
1997                 } else {
1998                         error = -ENOBUFS;
1999                         printk(KERN_INFO "  Inode: busy\n");
2000                 }
2001         }
2002
2003         error = 0;
2004
2005 out:
2006         spin_unlock(&gl->gl_spin);
2007         return error;
2008 }
2009
2010 /**
2011  * gfs2_dump_lockstate - print out the current lockstate
2012  * @sdp: the filesystem
2013  * @ub: the buffer to copy the information into
2014  *
2015  * If @ub is NULL, dump the lockstate to the console.
2016  *
2017  */
2018
2019 static int gfs2_dump_lockstate(struct gfs2_sbd *sdp)
2020 {
2021         struct gfs2_glock *gl;
2022         struct hlist_node *h;
2023         unsigned int x;
2024         int error = 0;
2025
2026         for (x = 0; x < GFS2_GL_HASH_SIZE; x++) {
2027
2028                 read_lock(gl_lock_addr(x));
2029
2030                 hlist_for_each_entry(gl, h, &gl_hash_table[x].hb_list, gl_list) {
2031                         if (gl->gl_sbd != sdp)
2032                                 continue;
2033
2034                         error = dump_glock(gl);
2035                         if (error)
2036                                 break;
2037                 }
2038
2039                 read_unlock(gl_lock_addr(x));
2040
2041                 if (error)
2042                         break;
2043         }
2044
2045
2046         return error;
2047 }
2048
2049 int __init gfs2_glock_init(void)
2050 {
2051         unsigned i;
2052         for(i = 0; i < GFS2_GL_HASH_SIZE; i++) {
2053                 INIT_HLIST_HEAD(&gl_hash_table[i].hb_list);
2054         }
2055 #ifdef GL_HASH_LOCK_SZ
2056         for(i = 0; i < GL_HASH_LOCK_SZ; i++) {
2057                 rwlock_init(&gl_hash_locks[i]);
2058         }
2059 #endif
2060         return 0;
2061 }
2062