faa6f57db7033d2e43301a0b9975f2c002dc83da
[powerpc.git] / fs / ocfs2 / dlmglue.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmglue.c
5  *
6  * Code which implements an OCFS2 specific interface to our DLM.
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
29 #include <linux/mm.h>
30 #include <linux/smp_lock.h>
31 #include <linux/crc32.h>
32 #include <linux/kthread.h>
33 #include <linux/pagemap.h>
34 #include <linux/debugfs.h>
35 #include <linux/seq_file.h>
36
37 #include <cluster/heartbeat.h>
38 #include <cluster/nodemanager.h>
39 #include <cluster/tcp.h>
40
41 #include <dlm/dlmapi.h>
42
43 #define MLOG_MASK_PREFIX ML_DLM_GLUE
44 #include <cluster/masklog.h>
45
46 #include "ocfs2.h"
47
48 #include "alloc.h"
49 #include "dcache.h"
50 #include "dlmglue.h"
51 #include "extent_map.h"
52 #include "heartbeat.h"
53 #include "inode.h"
54 #include "journal.h"
55 #include "slot_map.h"
56 #include "super.h"
57 #include "uptodate.h"
58 #include "vote.h"
59
60 #include "buffer_head_io.h"
61
62 struct ocfs2_mask_waiter {
63         struct list_head        mw_item;
64         int                     mw_status;
65         struct completion       mw_complete;
66         unsigned long           mw_mask;
67         unsigned long           mw_goal;
68 };
69
70 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
71 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
72
73 /*
74  * Return value from ocfs2_convert_worker_t functions.
75  *
76  * These control the precise actions of ocfs2_generic_unblock_lock()
77  * and ocfs2_process_blocked_lock()
78  *
79  */
80 enum ocfs2_unblock_action {
81         UNBLOCK_CONTINUE        = 0, /* Continue downconvert */
82         UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
83                                       * ->post_unlock callback */
84         UNBLOCK_STOP_POST       = 2, /* Do not downconvert, fire
85                                       * ->post_unlock() callback. */
86 };
87
88 struct ocfs2_unblock_ctl {
89         int requeue;
90         enum ocfs2_unblock_action unblock_action;
91 };
92
93 static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
94                               struct ocfs2_unblock_ctl *ctl);
95 static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
96                               struct ocfs2_unblock_ctl *ctl);
97 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
98                                     struct ocfs2_unblock_ctl *ctl);
99 static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
100                                      struct ocfs2_unblock_ctl *ctl);
101 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
102                                   struct ocfs2_unblock_ctl *ctl);
103
104 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
105                                      struct ocfs2_lock_res *lockres);
106
107 /*
108  * OCFS2 Lock Resource Operations
109  *
110  * These fine tune the behavior of the generic dlmglue locking infrastructure.
111  */
112 struct ocfs2_lock_res_ops {
113         /*
114          * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
115          * this callback if ->l_priv is not an ocfs2_super pointer
116          */
117         struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
118         int  (*unblock)(struct ocfs2_lock_res *, struct ocfs2_unblock_ctl *);
119         void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
120
121         /*
122          * Allow a lock type to add checks to determine whether it is
123          * safe to downconvert a lock. Return 0 to re-queue the
124          * downconvert at a later time, nonzero to continue.
125          *
126          * For most locks, the default checks that there are no
127          * incompatible holders are sufficient.
128          *
129          * Called with the lockres spinlock held.
130          */
131         int (*check_downconvert)(struct ocfs2_lock_res *, int);
132
133         /*
134          * Allows a lock type to populate the lock value block. This
135          * is called on downconvert, and when we drop a lock.
136          *
137          * Locks that want to use this should set LOCK_TYPE_USES_LVB
138          * in the flags field.
139          *
140          * Called with the lockres spinlock held.
141          */
142         void (*set_lvb)(struct ocfs2_lock_res *);
143
144         /*
145          * LOCK_TYPE_* flags which describe the specific requirements
146          * of a lock type. Descriptions of each individual flag follow.
147          */
148         int flags;
149 };
150
151 /*
152  * Some locks want to "refresh" potentially stale data when a
153  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
154  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
155  * individual lockres l_flags member from the ast function. It is
156  * expected that the locking wrapper will clear the
157  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
158  */
159 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
160
161 /*
162  * Indicate that a lock type makes use of the lock value block. The
163  * ->set_lvb lock type callback must be defined.
164  */
165 #define LOCK_TYPE_USES_LVB              0x2
166
167 typedef int (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
168 static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
169                                       struct ocfs2_lock_res *lockres,
170                                       struct ocfs2_unblock_ctl *ctl,
171                                       ocfs2_convert_worker_t *worker);
172
173 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
174         .get_osb        = ocfs2_get_inode_osb,
175         .unblock        = ocfs2_unblock_inode_lock,
176         .flags          = 0,
177 };
178
179 static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
180         .get_osb        = ocfs2_get_inode_osb,
181         .unblock        = ocfs2_unblock_meta,
182         .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
183 };
184
185 static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
186         .get_osb        = ocfs2_get_inode_osb,
187         .unblock        = ocfs2_unblock_data,
188         .flags          = 0,
189 };
190
191 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
192         .unblock        = ocfs2_unblock_osb_lock,
193         .flags          = LOCK_TYPE_REQUIRES_REFRESH,
194 };
195
196 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
197         .unblock        = ocfs2_unblock_osb_lock,
198         .flags          = 0,
199 };
200
201 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
202         .get_osb        = ocfs2_get_dentry_osb,
203         .unblock        = ocfs2_unblock_dentry_lock,
204         .post_unlock    = ocfs2_dentry_post_unlock,
205         .flags          = 0,
206 };
207
208 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
209 {
210         return lockres->l_type == OCFS2_LOCK_TYPE_META ||
211                 lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
212                 lockres->l_type == OCFS2_LOCK_TYPE_RW;
213 }
214
215 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
216 {
217         BUG_ON(!ocfs2_is_inode_lock(lockres));
218
219         return (struct inode *) lockres->l_priv;
220 }
221
222 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
223 {
224         BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
225
226         return (struct ocfs2_dentry_lock *)lockres->l_priv;
227 }
228
229 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
230 {
231         if (lockres->l_ops->get_osb)
232                 return lockres->l_ops->get_osb(lockres);
233
234         return (struct ocfs2_super *)lockres->l_priv;
235 }
236
237 static int ocfs2_lock_create(struct ocfs2_super *osb,
238                              struct ocfs2_lock_res *lockres,
239                              int level,
240                              int dlm_flags);
241 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
242                                                      int wanted);
243 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
244                                  struct ocfs2_lock_res *lockres,
245                                  int level);
246 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
247 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
248 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
249 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
250 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
251                                         struct ocfs2_lock_res *lockres);
252 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
253                                                 int convert);
254 #define ocfs2_log_dlm_error(_func, _stat, _lockres) do {        \
255         mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "  \
256                 "resource %s: %s\n", dlm_errname(_stat), _func, \
257                 _lockres->l_name, dlm_errmsg(_stat));           \
258 } while (0)
259 static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
260                                  struct ocfs2_lock_res *lockres);
261 static int ocfs2_meta_lock_update(struct inode *inode,
262                                   struct buffer_head **bh);
263 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
264 static inline int ocfs2_highest_compat_lock_level(int level);
265 static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
266                                                   struct ocfs2_lock_res *lockres,
267                                                   int new_level);
268
269 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
270                                   u64 blkno,
271                                   u32 generation,
272                                   char *name)
273 {
274         int len;
275
276         mlog_entry_void();
277
278         BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
279
280         len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
281                        ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
282                        (long long)blkno, generation);
283
284         BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
285
286         mlog(0, "built lock resource with name: %s\n", name);
287
288         mlog_exit_void();
289 }
290
291 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
292
293 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
294                                        struct ocfs2_dlm_debug *dlm_debug)
295 {
296         mlog(0, "Add tracking for lockres %s\n", res->l_name);
297
298         spin_lock(&ocfs2_dlm_tracking_lock);
299         list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
300         spin_unlock(&ocfs2_dlm_tracking_lock);
301 }
302
303 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
304 {
305         spin_lock(&ocfs2_dlm_tracking_lock);
306         if (!list_empty(&res->l_debug_list))
307                 list_del_init(&res->l_debug_list);
308         spin_unlock(&ocfs2_dlm_tracking_lock);
309 }
310
311 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
312                                        struct ocfs2_lock_res *res,
313                                        enum ocfs2_lock_type type,
314                                        struct ocfs2_lock_res_ops *ops,
315                                        void *priv)
316 {
317         res->l_type          = type;
318         res->l_ops           = ops;
319         res->l_priv          = priv;
320
321         res->l_level         = LKM_IVMODE;
322         res->l_requested     = LKM_IVMODE;
323         res->l_blocking      = LKM_IVMODE;
324         res->l_action        = OCFS2_AST_INVALID;
325         res->l_unlock_action = OCFS2_UNLOCK_INVALID;
326
327         res->l_flags         = OCFS2_LOCK_INITIALIZED;
328
329         ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
330 }
331
332 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
333 {
334         /* This also clears out the lock status block */
335         memset(res, 0, sizeof(struct ocfs2_lock_res));
336         spin_lock_init(&res->l_lock);
337         init_waitqueue_head(&res->l_event);
338         INIT_LIST_HEAD(&res->l_blocked_list);
339         INIT_LIST_HEAD(&res->l_mask_waiters);
340 }
341
342 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
343                                enum ocfs2_lock_type type,
344                                unsigned int generation,
345                                struct inode *inode)
346 {
347         struct ocfs2_lock_res_ops *ops;
348
349         switch(type) {
350                 case OCFS2_LOCK_TYPE_RW:
351                         ops = &ocfs2_inode_rw_lops;
352                         break;
353                 case OCFS2_LOCK_TYPE_META:
354                         ops = &ocfs2_inode_meta_lops;
355                         break;
356                 case OCFS2_LOCK_TYPE_DATA:
357                         ops = &ocfs2_inode_data_lops;
358                         break;
359                 default:
360                         mlog_bug_on_msg(1, "type: %d\n", type);
361                         ops = NULL; /* thanks, gcc */
362                         break;
363         };
364
365         ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
366                               generation, res->l_name);
367         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
368 }
369
370 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
371 {
372         struct inode *inode = ocfs2_lock_res_inode(lockres);
373
374         return OCFS2_SB(inode->i_sb);
375 }
376
377 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
378 {
379         __be64 inode_blkno_be;
380
381         memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
382                sizeof(__be64));
383
384         return be64_to_cpu(inode_blkno_be);
385 }
386
387 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
388 {
389         struct ocfs2_dentry_lock *dl = lockres->l_priv;
390
391         return OCFS2_SB(dl->dl_inode->i_sb);
392 }
393
394 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
395                                 u64 parent, struct inode *inode)
396 {
397         int len;
398         u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
399         __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
400         struct ocfs2_lock_res *lockres = &dl->dl_lockres;
401
402         ocfs2_lock_res_init_once(lockres);
403
404         /*
405          * Unfortunately, the standard lock naming scheme won't work
406          * here because we have two 16 byte values to use. Instead,
407          * we'll stuff the inode number as a binary value. We still
408          * want error prints to show something without garbling the
409          * display, so drop a null byte in there before the inode
410          * number. A future version of OCFS2 will likely use all
411          * binary lock names. The stringified names have been a
412          * tremendous aid in debugging, but now that the debugfs
413          * interface exists, we can mangle things there if need be.
414          *
415          * NOTE: We also drop the standard "pad" value (the total lock
416          * name size stays the same though - the last part is all
417          * zeros due to the memset in ocfs2_lock_res_init_once()
418          */
419         len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
420                        "%c%016llx",
421                        ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
422                        (long long)parent);
423
424         BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
425
426         memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
427                sizeof(__be64));
428
429         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
430                                    OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
431                                    dl);
432 }
433
434 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
435                                       struct ocfs2_super *osb)
436 {
437         /* Superblock lockres doesn't come from a slab so we call init
438          * once on it manually.  */
439         ocfs2_lock_res_init_once(res);
440         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
441                               0, res->l_name);
442         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
443                                    &ocfs2_super_lops, osb);
444 }
445
446 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
447                                        struct ocfs2_super *osb)
448 {
449         /* Rename lockres doesn't come from a slab so we call init
450          * once on it manually.  */
451         ocfs2_lock_res_init_once(res);
452         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
453         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
454                                    &ocfs2_rename_lops, osb);
455 }
456
457 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
458 {
459         mlog_entry_void();
460
461         if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
462                 return;
463
464         ocfs2_remove_lockres_tracking(res);
465
466         mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
467                         "Lockres %s is on the blocked list\n",
468                         res->l_name);
469         mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
470                         "Lockres %s has mask waiters pending\n",
471                         res->l_name);
472         mlog_bug_on_msg(spin_is_locked(&res->l_lock),
473                         "Lockres %s is locked\n",
474                         res->l_name);
475         mlog_bug_on_msg(res->l_ro_holders,
476                         "Lockres %s has %u ro holders\n",
477                         res->l_name, res->l_ro_holders);
478         mlog_bug_on_msg(res->l_ex_holders,
479                         "Lockres %s has %u ex holders\n",
480                         res->l_name, res->l_ex_holders);
481
482         /* Need to clear out the lock status block for the dlm */
483         memset(&res->l_lksb, 0, sizeof(res->l_lksb));
484
485         res->l_flags = 0UL;
486         mlog_exit_void();
487 }
488
489 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
490                                      int level)
491 {
492         mlog_entry_void();
493
494         BUG_ON(!lockres);
495
496         switch(level) {
497         case LKM_EXMODE:
498                 lockres->l_ex_holders++;
499                 break;
500         case LKM_PRMODE:
501                 lockres->l_ro_holders++;
502                 break;
503         default:
504                 BUG();
505         }
506
507         mlog_exit_void();
508 }
509
510 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
511                                      int level)
512 {
513         mlog_entry_void();
514
515         BUG_ON(!lockres);
516
517         switch(level) {
518         case LKM_EXMODE:
519                 BUG_ON(!lockres->l_ex_holders);
520                 lockres->l_ex_holders--;
521                 break;
522         case LKM_PRMODE:
523                 BUG_ON(!lockres->l_ro_holders);
524                 lockres->l_ro_holders--;
525                 break;
526         default:
527                 BUG();
528         }
529         mlog_exit_void();
530 }
531
532 /* WARNING: This function lives in a world where the only three lock
533  * levels are EX, PR, and NL. It *will* have to be adjusted when more
534  * lock types are added. */
535 static inline int ocfs2_highest_compat_lock_level(int level)
536 {
537         int new_level = LKM_EXMODE;
538
539         if (level == LKM_EXMODE)
540                 new_level = LKM_NLMODE;
541         else if (level == LKM_PRMODE)
542                 new_level = LKM_PRMODE;
543         return new_level;
544 }
545
546 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
547                               unsigned long newflags)
548 {
549         struct list_head *pos, *tmp;
550         struct ocfs2_mask_waiter *mw;
551
552         assert_spin_locked(&lockres->l_lock);
553
554         lockres->l_flags = newflags;
555
556         list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
557                 mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
558                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
559                         continue;
560
561                 list_del_init(&mw->mw_item);
562                 mw->mw_status = 0;
563                 complete(&mw->mw_complete);
564         }
565 }
566 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
567 {
568         lockres_set_flags(lockres, lockres->l_flags | or);
569 }
570 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
571                                 unsigned long clear)
572 {
573         lockres_set_flags(lockres, lockres->l_flags & ~clear);
574 }
575
576 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
577 {
578         mlog_entry_void();
579
580         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
581         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
582         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
583         BUG_ON(lockres->l_blocking <= LKM_NLMODE);
584
585         lockres->l_level = lockres->l_requested;
586         if (lockres->l_level <=
587             ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
588                 lockres->l_blocking = LKM_NLMODE;
589                 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
590         }
591         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
592
593         mlog_exit_void();
594 }
595
596 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
597 {
598         mlog_entry_void();
599
600         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
601         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
602
603         /* Convert from RO to EX doesn't really need anything as our
604          * information is already up to data. Convert from NL to
605          * *anything* however should mark ourselves as needing an
606          * update */
607         if (lockres->l_level == LKM_NLMODE &&
608             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
609                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
610
611         lockres->l_level = lockres->l_requested;
612         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
613
614         mlog_exit_void();
615 }
616
617 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
618 {
619         mlog_entry_void();
620
621         BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
622         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
623
624         if (lockres->l_requested > LKM_NLMODE &&
625             !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
626             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
627                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
628
629         lockres->l_level = lockres->l_requested;
630         lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
631         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
632
633         mlog_exit_void();
634 }
635
636 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
637                                      int level)
638 {
639         int needs_downconvert = 0;
640         mlog_entry_void();
641
642         assert_spin_locked(&lockres->l_lock);
643
644         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
645
646         if (level > lockres->l_blocking) {
647                 /* only schedule a downconvert if we haven't already scheduled
648                  * one that goes low enough to satisfy the level we're
649                  * blocking.  this also catches the case where we get
650                  * duplicate BASTs */
651                 if (ocfs2_highest_compat_lock_level(level) <
652                     ocfs2_highest_compat_lock_level(lockres->l_blocking))
653                         needs_downconvert = 1;
654
655                 lockres->l_blocking = level;
656         }
657
658         mlog_exit(needs_downconvert);
659         return needs_downconvert;
660 }
661
662 static void ocfs2_blocking_ast(void *opaque, int level)
663 {
664         struct ocfs2_lock_res *lockres = opaque;
665         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
666         int needs_downconvert;
667         unsigned long flags;
668
669         BUG_ON(level <= LKM_NLMODE);
670
671         mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
672              lockres->l_name, level, lockres->l_level,
673              ocfs2_lock_type_string(lockres->l_type));
674
675         spin_lock_irqsave(&lockres->l_lock, flags);
676         needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
677         if (needs_downconvert)
678                 ocfs2_schedule_blocked_lock(osb, lockres);
679         spin_unlock_irqrestore(&lockres->l_lock, flags);
680
681         wake_up(&lockres->l_event);
682
683         ocfs2_kick_vote_thread(osb);
684 }
685
686 static void ocfs2_locking_ast(void *opaque)
687 {
688         struct ocfs2_lock_res *lockres = opaque;
689         struct dlm_lockstatus *lksb = &lockres->l_lksb;
690         unsigned long flags;
691
692         spin_lock_irqsave(&lockres->l_lock, flags);
693
694         if (lksb->status != DLM_NORMAL) {
695                 mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
696                      lockres->l_name, lksb->status);
697                 spin_unlock_irqrestore(&lockres->l_lock, flags);
698                 return;
699         }
700
701         switch(lockres->l_action) {
702         case OCFS2_AST_ATTACH:
703                 ocfs2_generic_handle_attach_action(lockres);
704                 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
705                 break;
706         case OCFS2_AST_CONVERT:
707                 ocfs2_generic_handle_convert_action(lockres);
708                 break;
709         case OCFS2_AST_DOWNCONVERT:
710                 ocfs2_generic_handle_downconvert_action(lockres);
711                 break;
712         default:
713                 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
714                      "lockres flags = 0x%lx, unlock action: %u\n",
715                      lockres->l_name, lockres->l_action, lockres->l_flags,
716                      lockres->l_unlock_action);
717                 BUG();
718         }
719
720         /* set it to something invalid so if we get called again we
721          * can catch it. */
722         lockres->l_action = OCFS2_AST_INVALID;
723
724         wake_up(&lockres->l_event);
725         spin_unlock_irqrestore(&lockres->l_lock, flags);
726 }
727
728 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
729                                                 int convert)
730 {
731         unsigned long flags;
732
733         mlog_entry_void();
734         spin_lock_irqsave(&lockres->l_lock, flags);
735         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
736         if (convert)
737                 lockres->l_action = OCFS2_AST_INVALID;
738         else
739                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
740         spin_unlock_irqrestore(&lockres->l_lock, flags);
741
742         wake_up(&lockres->l_event);
743         mlog_exit_void();
744 }
745
746 /* Note: If we detect another process working on the lock (i.e.,
747  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
748  * to do the right thing in that case.
749  */
750 static int ocfs2_lock_create(struct ocfs2_super *osb,
751                              struct ocfs2_lock_res *lockres,
752                              int level,
753                              int dlm_flags)
754 {
755         int ret = 0;
756         enum dlm_status status;
757         unsigned long flags;
758
759         mlog_entry_void();
760
761         mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
762              dlm_flags);
763
764         spin_lock_irqsave(&lockres->l_lock, flags);
765         if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
766             (lockres->l_flags & OCFS2_LOCK_BUSY)) {
767                 spin_unlock_irqrestore(&lockres->l_lock, flags);
768                 goto bail;
769         }
770
771         lockres->l_action = OCFS2_AST_ATTACH;
772         lockres->l_requested = level;
773         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
774         spin_unlock_irqrestore(&lockres->l_lock, flags);
775
776         status = dlmlock(osb->dlm,
777                          level,
778                          &lockres->l_lksb,
779                          dlm_flags,
780                          lockres->l_name,
781                          OCFS2_LOCK_ID_MAX_LEN - 1,
782                          ocfs2_locking_ast,
783                          lockres,
784                          ocfs2_blocking_ast);
785         if (status != DLM_NORMAL) {
786                 ocfs2_log_dlm_error("dlmlock", status, lockres);
787                 ret = -EINVAL;
788                 ocfs2_recover_from_dlm_error(lockres, 1);
789         }
790
791         mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
792
793 bail:
794         mlog_exit(ret);
795         return ret;
796 }
797
798 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
799                                         int flag)
800 {
801         unsigned long flags;
802         int ret;
803
804         spin_lock_irqsave(&lockres->l_lock, flags);
805         ret = lockres->l_flags & flag;
806         spin_unlock_irqrestore(&lockres->l_lock, flags);
807
808         return ret;
809 }
810
811 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
812
813 {
814         wait_event(lockres->l_event,
815                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
816 }
817
818 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
819
820 {
821         wait_event(lockres->l_event,
822                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
823 }
824
825 /* predict what lock level we'll be dropping down to on behalf
826  * of another node, and return true if the currently wanted
827  * level will be compatible with it. */
828 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
829                                                      int wanted)
830 {
831         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
832
833         return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
834 }
835
836 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
837 {
838         INIT_LIST_HEAD(&mw->mw_item);
839         init_completion(&mw->mw_complete);
840 }
841
842 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
843 {
844         wait_for_completion(&mw->mw_complete);
845         /* Re-arm the completion in case we want to wait on it again */
846         INIT_COMPLETION(mw->mw_complete);
847         return mw->mw_status;
848 }
849
850 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
851                                     struct ocfs2_mask_waiter *mw,
852                                     unsigned long mask,
853                                     unsigned long goal)
854 {
855         BUG_ON(!list_empty(&mw->mw_item));
856
857         assert_spin_locked(&lockres->l_lock);
858
859         list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
860         mw->mw_mask = mask;
861         mw->mw_goal = goal;
862 }
863
864 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
865  * if the mask still hadn't reached its goal */
866 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
867                                       struct ocfs2_mask_waiter *mw)
868 {
869         unsigned long flags;
870         int ret = 0;
871
872         spin_lock_irqsave(&lockres->l_lock, flags);
873         if (!list_empty(&mw->mw_item)) {
874                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
875                         ret = -EBUSY;
876
877                 list_del_init(&mw->mw_item);
878                 init_completion(&mw->mw_complete);
879         }
880         spin_unlock_irqrestore(&lockres->l_lock, flags);
881
882         return ret;
883
884 }
885
886 static int ocfs2_cluster_lock(struct ocfs2_super *osb,
887                               struct ocfs2_lock_res *lockres,
888                               int level,
889                               int lkm_flags,
890                               int arg_flags)
891 {
892         struct ocfs2_mask_waiter mw;
893         enum dlm_status status;
894         int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
895         int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
896         unsigned long flags;
897
898         mlog_entry_void();
899
900         ocfs2_init_mask_waiter(&mw);
901
902         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
903                 lkm_flags |= LKM_VALBLK;
904
905 again:
906         wait = 0;
907
908         if (catch_signals && signal_pending(current)) {
909                 ret = -ERESTARTSYS;
910                 goto out;
911         }
912
913         spin_lock_irqsave(&lockres->l_lock, flags);
914
915         mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
916                         "Cluster lock called on freeing lockres %s! flags "
917                         "0x%lx\n", lockres->l_name, lockres->l_flags);
918
919         /* We only compare against the currently granted level
920          * here. If the lock is blocked waiting on a downconvert,
921          * we'll get caught below. */
922         if (lockres->l_flags & OCFS2_LOCK_BUSY &&
923             level > lockres->l_level) {
924                 /* is someone sitting in dlm_lock? If so, wait on
925                  * them. */
926                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
927                 wait = 1;
928                 goto unlock;
929         }
930
931         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
932                 /* lock has not been created yet. */
933                 spin_unlock_irqrestore(&lockres->l_lock, flags);
934
935                 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
936                 if (ret < 0) {
937                         mlog_errno(ret);
938                         goto out;
939                 }
940                 goto again;
941         }
942
943         if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
944             !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
945                 /* is the lock is currently blocked on behalf of
946                  * another node */
947                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
948                 wait = 1;
949                 goto unlock;
950         }
951
952         if (level > lockres->l_level) {
953                 if (lockres->l_action != OCFS2_AST_INVALID)
954                         mlog(ML_ERROR, "lockres %s has action %u pending\n",
955                              lockres->l_name, lockres->l_action);
956
957                 lockres->l_action = OCFS2_AST_CONVERT;
958                 lockres->l_requested = level;
959                 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
960                 spin_unlock_irqrestore(&lockres->l_lock, flags);
961
962                 BUG_ON(level == LKM_IVMODE);
963                 BUG_ON(level == LKM_NLMODE);
964
965                 mlog(0, "lock %s, convert from %d to level = %d\n",
966                      lockres->l_name, lockres->l_level, level);
967
968                 /* call dlm_lock to upgrade lock now */
969                 status = dlmlock(osb->dlm,
970                                  level,
971                                  &lockres->l_lksb,
972                                  lkm_flags|LKM_CONVERT,
973                                  lockres->l_name,
974                                  OCFS2_LOCK_ID_MAX_LEN - 1,
975                                  ocfs2_locking_ast,
976                                  lockres,
977                                  ocfs2_blocking_ast);
978                 if (status != DLM_NORMAL) {
979                         if ((lkm_flags & LKM_NOQUEUE) &&
980                             (status == DLM_NOTQUEUED))
981                                 ret = -EAGAIN;
982                         else {
983                                 ocfs2_log_dlm_error("dlmlock", status,
984                                                     lockres);
985                                 ret = -EINVAL;
986                         }
987                         ocfs2_recover_from_dlm_error(lockres, 1);
988                         goto out;
989                 }
990
991                 mlog(0, "lock %s, successfull return from dlmlock\n",
992                      lockres->l_name);
993
994                 /* At this point we've gone inside the dlm and need to
995                  * complete our work regardless. */
996                 catch_signals = 0;
997
998                 /* wait for busy to clear and carry on */
999                 goto again;
1000         }
1001
1002         /* Ok, if we get here then we're good to go. */
1003         ocfs2_inc_holders(lockres, level);
1004
1005         ret = 0;
1006 unlock:
1007         spin_unlock_irqrestore(&lockres->l_lock, flags);
1008 out:
1009         /*
1010          * This is helping work around a lock inversion between the page lock
1011          * and dlm locks.  One path holds the page lock while calling aops
1012          * which block acquiring dlm locks.  The voting thread holds dlm
1013          * locks while acquiring page locks while down converting data locks.
1014          * This block is helping an aop path notice the inversion and back
1015          * off to unlock its page lock before trying the dlm lock again.
1016          */
1017         if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1018             mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1019                 wait = 0;
1020                 if (lockres_remove_mask_waiter(lockres, &mw))
1021                         ret = -EAGAIN;
1022                 else
1023                         goto again;
1024         }
1025         if (wait) {
1026                 ret = ocfs2_wait_for_mask(&mw);
1027                 if (ret == 0)
1028                         goto again;
1029                 mlog_errno(ret);
1030         }
1031
1032         mlog_exit(ret);
1033         return ret;
1034 }
1035
1036 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1037                                  struct ocfs2_lock_res *lockres,
1038                                  int level)
1039 {
1040         unsigned long flags;
1041
1042         mlog_entry_void();
1043         spin_lock_irqsave(&lockres->l_lock, flags);
1044         ocfs2_dec_holders(lockres, level);
1045         ocfs2_vote_on_unlock(osb, lockres);
1046         spin_unlock_irqrestore(&lockres->l_lock, flags);
1047         mlog_exit_void();
1048 }
1049
1050 int ocfs2_create_new_lock(struct ocfs2_super *osb,
1051                           struct ocfs2_lock_res *lockres,
1052                           int ex,
1053                           int local)
1054 {
1055         int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1056         unsigned long flags;
1057         int lkm_flags = local ? LKM_LOCAL : 0;
1058
1059         spin_lock_irqsave(&lockres->l_lock, flags);
1060         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1061         lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1062         spin_unlock_irqrestore(&lockres->l_lock, flags);
1063
1064         return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1065 }
1066
1067 /* Grants us an EX lock on the data and metadata resources, skipping
1068  * the normal cluster directory lookup. Use this ONLY on newly created
1069  * inodes which other nodes can't possibly see, and which haven't been
1070  * hashed in the inode hash yet. This can give us a good performance
1071  * increase as it'll skip the network broadcast normally associated
1072  * with creating a new lock resource. */
1073 int ocfs2_create_new_inode_locks(struct inode *inode)
1074 {
1075         int ret;
1076         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1077
1078         BUG_ON(!inode);
1079         BUG_ON(!ocfs2_inode_is_new(inode));
1080
1081         mlog_entry_void();
1082
1083         mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1084
1085         /* NOTE: That we don't increment any of the holder counts, nor
1086          * do we add anything to a journal handle. Since this is
1087          * supposed to be a new inode which the cluster doesn't know
1088          * about yet, there is no need to.  As far as the LVB handling
1089          * is concerned, this is basically like acquiring an EX lock
1090          * on a resource which has an invalid one -- we'll set it
1091          * valid when we release the EX. */
1092
1093         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1094         if (ret) {
1095                 mlog_errno(ret);
1096                 goto bail;
1097         }
1098
1099         /*
1100          * We don't want to use LKM_LOCAL on a meta data lock as they
1101          * don't use a generation in their lock names.
1102          */
1103         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
1104         if (ret) {
1105                 mlog_errno(ret);
1106                 goto bail;
1107         }
1108
1109         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
1110         if (ret) {
1111                 mlog_errno(ret);
1112                 goto bail;
1113         }
1114
1115 bail:
1116         mlog_exit(ret);
1117         return ret;
1118 }
1119
1120 int ocfs2_rw_lock(struct inode *inode, int write)
1121 {
1122         int status, level;
1123         struct ocfs2_lock_res *lockres;
1124
1125         BUG_ON(!inode);
1126
1127         mlog_entry_void();
1128
1129         mlog(0, "inode %llu take %s RW lock\n",
1130              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1131              write ? "EXMODE" : "PRMODE");
1132
1133         lockres = &OCFS2_I(inode)->ip_rw_lockres;
1134
1135         level = write ? LKM_EXMODE : LKM_PRMODE;
1136
1137         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1138                                     0);
1139         if (status < 0)
1140                 mlog_errno(status);
1141
1142         mlog_exit(status);
1143         return status;
1144 }
1145
1146 void ocfs2_rw_unlock(struct inode *inode, int write)
1147 {
1148         int level = write ? LKM_EXMODE : LKM_PRMODE;
1149         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1150
1151         mlog_entry_void();
1152
1153         mlog(0, "inode %llu drop %s RW lock\n",
1154              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1155              write ? "EXMODE" : "PRMODE");
1156
1157         ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1158
1159         mlog_exit_void();
1160 }
1161
1162 int ocfs2_data_lock_full(struct inode *inode,
1163                          int write,
1164                          int arg_flags)
1165 {
1166         int status = 0, level;
1167         struct ocfs2_lock_res *lockres;
1168
1169         BUG_ON(!inode);
1170
1171         mlog_entry_void();
1172
1173         mlog(0, "inode %llu take %s DATA lock\n",
1174              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1175              write ? "EXMODE" : "PRMODE");
1176
1177         /* We'll allow faking a readonly data lock for
1178          * rodevices. */
1179         if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
1180                 if (write) {
1181                         status = -EROFS;
1182                         mlog_errno(status);
1183                 }
1184                 goto out;
1185         }
1186
1187         lockres = &OCFS2_I(inode)->ip_data_lockres;
1188
1189         level = write ? LKM_EXMODE : LKM_PRMODE;
1190
1191         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
1192                                     0, arg_flags);
1193         if (status < 0 && status != -EAGAIN)
1194                 mlog_errno(status);
1195
1196 out:
1197         mlog_exit(status);
1198         return status;
1199 }
1200
1201 /* see ocfs2_meta_lock_with_page() */
1202 int ocfs2_data_lock_with_page(struct inode *inode,
1203                               int write,
1204                               struct page *page)
1205 {
1206         int ret;
1207
1208         ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
1209         if (ret == -EAGAIN) {
1210                 unlock_page(page);
1211                 if (ocfs2_data_lock(inode, write) == 0)
1212                         ocfs2_data_unlock(inode, write);
1213                 ret = AOP_TRUNCATED_PAGE;
1214         }
1215
1216         return ret;
1217 }
1218
1219 static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1220                                  struct ocfs2_lock_res *lockres)
1221 {
1222         int kick = 0;
1223
1224         mlog_entry_void();
1225
1226         /* If we know that another node is waiting on our lock, kick
1227          * the vote thread * pre-emptively when we reach a release
1228          * condition. */
1229         if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1230                 switch(lockres->l_blocking) {
1231                 case LKM_EXMODE:
1232                         if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1233                                 kick = 1;
1234                         break;
1235                 case LKM_PRMODE:
1236                         if (!lockres->l_ex_holders)
1237                                 kick = 1;
1238                         break;
1239                 default:
1240                         BUG();
1241                 }
1242         }
1243
1244         if (kick)
1245                 ocfs2_kick_vote_thread(osb);
1246
1247         mlog_exit_void();
1248 }
1249
1250 void ocfs2_data_unlock(struct inode *inode,
1251                        int write)
1252 {
1253         int level = write ? LKM_EXMODE : LKM_PRMODE;
1254         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
1255
1256         mlog_entry_void();
1257
1258         mlog(0, "inode %llu drop %s DATA lock\n",
1259              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1260              write ? "EXMODE" : "PRMODE");
1261
1262         if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1263                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1264
1265         mlog_exit_void();
1266 }
1267
1268 #define OCFS2_SEC_BITS   34
1269 #define OCFS2_SEC_SHIFT  (64 - 34)
1270 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1271
1272 /* LVB only has room for 64 bits of time here so we pack it for
1273  * now. */
1274 static u64 ocfs2_pack_timespec(struct timespec *spec)
1275 {
1276         u64 res;
1277         u64 sec = spec->tv_sec;
1278         u32 nsec = spec->tv_nsec;
1279
1280         res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1281
1282         return res;
1283 }
1284
1285 /* Call this with the lockres locked. I am reasonably sure we don't
1286  * need ip_lock in this function as anyone who would be changing those
1287  * values is supposed to be blocked in ocfs2_meta_lock right now. */
1288 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1289 {
1290         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1291         struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1292         struct ocfs2_meta_lvb *lvb;
1293
1294         mlog_entry_void();
1295
1296         lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1297
1298         /*
1299          * Invalidate the LVB of a deleted inode - this way other
1300          * nodes are forced to go to disk and discover the new inode
1301          * status.
1302          */
1303         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1304                 lvb->lvb_version = 0;
1305                 goto out;
1306         }
1307
1308         lvb->lvb_version   = OCFS2_LVB_VERSION;
1309         lvb->lvb_isize     = cpu_to_be64(i_size_read(inode));
1310         lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1311         lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1312         lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1313         lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1314         lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1315         lvb->lvb_iatime_packed  =
1316                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1317         lvb->lvb_ictime_packed =
1318                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1319         lvb->lvb_imtime_packed =
1320                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1321         lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1322         lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1323
1324 out:
1325         mlog_meta_lvb(0, lockres);
1326
1327         mlog_exit_void();
1328 }
1329
1330 static void ocfs2_unpack_timespec(struct timespec *spec,
1331                                   u64 packed_time)
1332 {
1333         spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1334         spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1335 }
1336
1337 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1338 {
1339         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1340         struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1341         struct ocfs2_meta_lvb *lvb;
1342
1343         mlog_entry_void();
1344
1345         mlog_meta_lvb(0, lockres);
1346
1347         lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1348
1349         /* We're safe here without the lockres lock... */
1350         spin_lock(&oi->ip_lock);
1351         oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1352         i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1353
1354         oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1355         ocfs2_set_inode_flags(inode);
1356
1357         /* fast-symlinks are a special case */
1358         if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1359                 inode->i_blocks = 0;
1360         else
1361                 inode->i_blocks =
1362                         ocfs2_align_bytes_to_sectors(i_size_read(inode));
1363
1364         inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1365         inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1366         inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1367         inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1368         ocfs2_unpack_timespec(&inode->i_atime,
1369                               be64_to_cpu(lvb->lvb_iatime_packed));
1370         ocfs2_unpack_timespec(&inode->i_mtime,
1371                               be64_to_cpu(lvb->lvb_imtime_packed));
1372         ocfs2_unpack_timespec(&inode->i_ctime,
1373                               be64_to_cpu(lvb->lvb_ictime_packed));
1374         spin_unlock(&oi->ip_lock);
1375
1376         mlog_exit_void();
1377 }
1378
1379 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1380                                               struct ocfs2_lock_res *lockres)
1381 {
1382         struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1383
1384         if (lvb->lvb_version == OCFS2_LVB_VERSION
1385             && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1386                 return 1;
1387         return 0;
1388 }
1389
1390 /* Determine whether a lock resource needs to be refreshed, and
1391  * arbitrate who gets to refresh it.
1392  *
1393  *   0 means no refresh needed.
1394  *
1395  *   > 0 means you need to refresh this and you MUST call
1396  *   ocfs2_complete_lock_res_refresh afterwards. */
1397 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1398 {
1399         unsigned long flags;
1400         int status = 0;
1401
1402         mlog_entry_void();
1403
1404 refresh_check:
1405         spin_lock_irqsave(&lockres->l_lock, flags);
1406         if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1407                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1408                 goto bail;
1409         }
1410
1411         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1412                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1413
1414                 ocfs2_wait_on_refreshing_lock(lockres);
1415                 goto refresh_check;
1416         }
1417
1418         /* Ok, I'll be the one to refresh this lock. */
1419         lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1420         spin_unlock_irqrestore(&lockres->l_lock, flags);
1421
1422         status = 1;
1423 bail:
1424         mlog_exit(status);
1425         return status;
1426 }
1427
1428 /* If status is non zero, I'll mark it as not being in refresh
1429  * anymroe, but i won't clear the needs refresh flag. */
1430 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1431                                                    int status)
1432 {
1433         unsigned long flags;
1434         mlog_entry_void();
1435
1436         spin_lock_irqsave(&lockres->l_lock, flags);
1437         lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1438         if (!status)
1439                 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1440         spin_unlock_irqrestore(&lockres->l_lock, flags);
1441
1442         wake_up(&lockres->l_event);
1443
1444         mlog_exit_void();
1445 }
1446
1447 /* may or may not return a bh if it went to disk. */
1448 static int ocfs2_meta_lock_update(struct inode *inode,
1449                                   struct buffer_head **bh)
1450 {
1451         int status = 0;
1452         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1453         struct ocfs2_lock_res *lockres;
1454         struct ocfs2_dinode *fe;
1455
1456         mlog_entry_void();
1457
1458         spin_lock(&oi->ip_lock);
1459         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1460                 mlog(0, "Orphaned inode %llu was deleted while we "
1461                      "were waiting on a lock. ip_flags = 0x%x\n",
1462                      (unsigned long long)oi->ip_blkno, oi->ip_flags);
1463                 spin_unlock(&oi->ip_lock);
1464                 status = -ENOENT;
1465                 goto bail;
1466         }
1467         spin_unlock(&oi->ip_lock);
1468
1469         lockres = &oi->ip_meta_lockres;
1470
1471         if (!ocfs2_should_refresh_lock_res(lockres))
1472                 goto bail;
1473
1474         /* This will discard any caching information we might have had
1475          * for the inode metadata. */
1476         ocfs2_metadata_cache_purge(inode);
1477
1478         /* will do nothing for inode types that don't use the extent
1479          * map (directories, bitmap files, etc) */
1480         ocfs2_extent_map_trunc(inode, 0);
1481
1482         if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1483                 mlog(0, "Trusting LVB on inode %llu\n",
1484                      (unsigned long long)oi->ip_blkno);
1485                 ocfs2_refresh_inode_from_lvb(inode);
1486         } else {
1487                 /* Boo, we have to go to disk. */
1488                 /* read bh, cast, ocfs2_refresh_inode */
1489                 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1490                                           bh, OCFS2_BH_CACHED, inode);
1491                 if (status < 0) {
1492                         mlog_errno(status);
1493                         goto bail_refresh;
1494                 }
1495                 fe = (struct ocfs2_dinode *) (*bh)->b_data;
1496
1497                 /* This is a good chance to make sure we're not
1498                  * locking an invalid object.
1499                  *
1500                  * We bug on a stale inode here because we checked
1501                  * above whether it was wiped from disk. The wiping
1502                  * node provides a guarantee that we receive that
1503                  * message and can mark the inode before dropping any
1504                  * locks associated with it. */
1505                 if (!OCFS2_IS_VALID_DINODE(fe)) {
1506                         OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1507                         status = -EIO;
1508                         goto bail_refresh;
1509                 }
1510                 mlog_bug_on_msg(inode->i_generation !=
1511                                 le32_to_cpu(fe->i_generation),
1512                                 "Invalid dinode %llu disk generation: %u "
1513                                 "inode->i_generation: %u\n",
1514                                 (unsigned long long)oi->ip_blkno,
1515                                 le32_to_cpu(fe->i_generation),
1516                                 inode->i_generation);
1517                 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1518                                 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1519                                 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
1520                                 (unsigned long long)oi->ip_blkno,
1521                                 (unsigned long long)le64_to_cpu(fe->i_dtime),
1522                                 le32_to_cpu(fe->i_flags));
1523
1524                 ocfs2_refresh_inode(inode, fe);
1525         }
1526
1527         status = 0;
1528 bail_refresh:
1529         ocfs2_complete_lock_res_refresh(lockres, status);
1530 bail:
1531         mlog_exit(status);
1532         return status;
1533 }
1534
1535 static int ocfs2_assign_bh(struct inode *inode,
1536                            struct buffer_head **ret_bh,
1537                            struct buffer_head *passed_bh)
1538 {
1539         int status;
1540
1541         if (passed_bh) {
1542                 /* Ok, the update went to disk for us, use the
1543                  * returned bh. */
1544                 *ret_bh = passed_bh;
1545                 get_bh(*ret_bh);
1546
1547                 return 0;
1548         }
1549
1550         status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1551                                   OCFS2_I(inode)->ip_blkno,
1552                                   ret_bh,
1553                                   OCFS2_BH_CACHED,
1554                                   inode);
1555         if (status < 0)
1556                 mlog_errno(status);
1557
1558         return status;
1559 }
1560
1561 /*
1562  * returns < 0 error if the callback will never be called, otherwise
1563  * the result of the lock will be communicated via the callback.
1564  */
1565 int ocfs2_meta_lock_full(struct inode *inode,
1566                          struct ocfs2_journal_handle *handle,
1567                          struct buffer_head **ret_bh,
1568                          int ex,
1569                          int arg_flags)
1570 {
1571         int status, level, dlm_flags, acquired;
1572         struct ocfs2_lock_res *lockres;
1573         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1574         struct buffer_head *local_bh = NULL;
1575
1576         BUG_ON(!inode);
1577
1578         mlog_entry_void();
1579
1580         mlog(0, "inode %llu, take %s META lock\n",
1581              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1582              ex ? "EXMODE" : "PRMODE");
1583
1584         status = 0;
1585         acquired = 0;
1586         /* We'll allow faking a readonly metadata lock for
1587          * rodevices. */
1588         if (ocfs2_is_hard_readonly(osb)) {
1589                 if (ex)
1590                         status = -EROFS;
1591                 goto bail;
1592         }
1593
1594         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1595                 wait_event(osb->recovery_event,
1596                            ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1597
1598         acquired = 0;
1599         lockres = &OCFS2_I(inode)->ip_meta_lockres;
1600         level = ex ? LKM_EXMODE : LKM_PRMODE;
1601         dlm_flags = 0;
1602         if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1603                 dlm_flags |= LKM_NOQUEUE;
1604
1605         status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1606         if (status < 0) {
1607                 if (status != -EAGAIN && status != -EIOCBRETRY)
1608                         mlog_errno(status);
1609                 goto bail;
1610         }
1611
1612         /* Notify the error cleanup path to drop the cluster lock. */
1613         acquired = 1;
1614
1615         /* We wait twice because a node may have died while we were in
1616          * the lower dlm layers. The second time though, we've
1617          * committed to owning this lock so we don't allow signals to
1618          * abort the operation. */
1619         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1620                 wait_event(osb->recovery_event,
1621                            ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1622
1623         /*
1624          * We only see this flag if we're being called from
1625          * ocfs2_read_locked_inode(). It means we're locking an inode
1626          * which hasn't been populated yet, so clear the refresh flag
1627          * and let the caller handle it.
1628          */
1629         if (inode->i_state & I_NEW) {
1630                 status = 0;
1631                 ocfs2_complete_lock_res_refresh(lockres, 0);
1632                 goto bail;
1633         }
1634
1635         /* This is fun. The caller may want a bh back, or it may
1636          * not. ocfs2_meta_lock_update definitely wants one in, but
1637          * may or may not read one, depending on what's in the
1638          * LVB. The result of all of this is that we've *only* gone to
1639          * disk if we have to, so the complexity is worthwhile. */
1640         status = ocfs2_meta_lock_update(inode, &local_bh);
1641         if (status < 0) {
1642                 if (status != -ENOENT)
1643                         mlog_errno(status);
1644                 goto bail;
1645         }
1646
1647         if (ret_bh) {
1648                 status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1649                 if (status < 0) {
1650                         mlog_errno(status);
1651                         goto bail;
1652                 }
1653         }
1654
1655         if (handle) {
1656                 status = ocfs2_handle_add_lock(handle, inode);
1657                 if (status < 0)
1658                         mlog_errno(status);
1659         }
1660
1661 bail:
1662         if (status < 0) {
1663                 if (ret_bh && (*ret_bh)) {
1664                         brelse(*ret_bh);
1665                         *ret_bh = NULL;
1666                 }
1667                 if (acquired)
1668                         ocfs2_meta_unlock(inode, ex);
1669         }
1670
1671         if (local_bh)
1672                 brelse(local_bh);
1673
1674         mlog_exit(status);
1675         return status;
1676 }
1677
1678 /*
1679  * This is working around a lock inversion between tasks acquiring DLM locks
1680  * while holding a page lock and the vote thread which blocks dlm lock acquiry
1681  * while acquiring page locks.
1682  *
1683  * ** These _with_page variantes are only intended to be called from aop
1684  * methods that hold page locks and return a very specific *positive* error
1685  * code that aop methods pass up to the VFS -- test for errors with != 0. **
1686  *
1687  * The DLM is called such that it returns -EAGAIN if it would have blocked
1688  * waiting for the vote thread.  In that case we unlock our page so the vote
1689  * thread can make progress.  Once we've done this we have to return
1690  * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1691  * into the VFS who will then immediately retry the aop call.
1692  *
1693  * We do a blocking lock and immediate unlock before returning, though, so that
1694  * the lock has a great chance of being cached on this node by the time the VFS
1695  * calls back to retry the aop.    This has a potential to livelock as nodes
1696  * ping locks back and forth, but that's a risk we're willing to take to avoid
1697  * the lock inversion simply.
1698  */
1699 int ocfs2_meta_lock_with_page(struct inode *inode,
1700                               struct ocfs2_journal_handle *handle,
1701                               struct buffer_head **ret_bh,
1702                               int ex,
1703                               struct page *page)
1704 {
1705         int ret;
1706
1707         ret = ocfs2_meta_lock_full(inode, handle, ret_bh, ex,
1708                                    OCFS2_LOCK_NONBLOCK);
1709         if (ret == -EAGAIN) {
1710                 unlock_page(page);
1711                 if (ocfs2_meta_lock(inode, handle, ret_bh, ex) == 0)
1712                         ocfs2_meta_unlock(inode, ex);
1713                 ret = AOP_TRUNCATED_PAGE;
1714         }
1715
1716         return ret;
1717 }
1718
1719 void ocfs2_meta_unlock(struct inode *inode,
1720                        int ex)
1721 {
1722         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1723         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
1724
1725         mlog_entry_void();
1726
1727         mlog(0, "inode %llu drop %s META lock\n",
1728              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1729              ex ? "EXMODE" : "PRMODE");
1730
1731         if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1732                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1733
1734         mlog_exit_void();
1735 }
1736
1737 int ocfs2_super_lock(struct ocfs2_super *osb,
1738                      int ex)
1739 {
1740         int status;
1741         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1742         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1743         struct buffer_head *bh;
1744         struct ocfs2_slot_info *si = osb->slot_info;
1745
1746         mlog_entry_void();
1747
1748         if (ocfs2_is_hard_readonly(osb))
1749                 return -EROFS;
1750
1751         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1752         if (status < 0) {
1753                 mlog_errno(status);
1754                 goto bail;
1755         }
1756
1757         /* The super block lock path is really in the best position to
1758          * know when resources covered by the lock need to be
1759          * refreshed, so we do it here. Of course, making sense of
1760          * everything is up to the caller :) */
1761         status = ocfs2_should_refresh_lock_res(lockres);
1762         if (status < 0) {
1763                 mlog_errno(status);
1764                 goto bail;
1765         }
1766         if (status) {
1767                 bh = si->si_bh;
1768                 status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
1769                                           si->si_inode);
1770                 if (status == 0)
1771                         ocfs2_update_slot_info(si);
1772
1773                 ocfs2_complete_lock_res_refresh(lockres, status);
1774
1775                 if (status < 0)
1776                         mlog_errno(status);
1777         }
1778 bail:
1779         mlog_exit(status);
1780         return status;
1781 }
1782
1783 void ocfs2_super_unlock(struct ocfs2_super *osb,
1784                         int ex)
1785 {
1786         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1787         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1788
1789         ocfs2_cluster_unlock(osb, lockres, level);
1790 }
1791
1792 int ocfs2_rename_lock(struct ocfs2_super *osb)
1793 {
1794         int status;
1795         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1796
1797         if (ocfs2_is_hard_readonly(osb))
1798                 return -EROFS;
1799
1800         status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
1801         if (status < 0)
1802                 mlog_errno(status);
1803
1804         return status;
1805 }
1806
1807 void ocfs2_rename_unlock(struct ocfs2_super *osb)
1808 {
1809         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1810
1811         ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
1812 }
1813
1814 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
1815 {
1816         int ret;
1817         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1818         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1819         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1820
1821         BUG_ON(!dl);
1822
1823         if (ocfs2_is_hard_readonly(osb))
1824                 return -EROFS;
1825
1826         ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
1827         if (ret < 0)
1828                 mlog_errno(ret);
1829
1830         return ret;
1831 }
1832
1833 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
1834 {
1835         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1836         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1837         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1838
1839         ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
1840 }
1841
1842 /* Reference counting of the dlm debug structure. We want this because
1843  * open references on the debug inodes can live on after a mount, so
1844  * we can't rely on the ocfs2_super to always exist. */
1845 static void ocfs2_dlm_debug_free(struct kref *kref)
1846 {
1847         struct ocfs2_dlm_debug *dlm_debug;
1848
1849         dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
1850
1851         kfree(dlm_debug);
1852 }
1853
1854 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
1855 {
1856         if (dlm_debug)
1857                 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
1858 }
1859
1860 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
1861 {
1862         kref_get(&debug->d_refcnt);
1863 }
1864
1865 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
1866 {
1867         struct ocfs2_dlm_debug *dlm_debug;
1868
1869         dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
1870         if (!dlm_debug) {
1871                 mlog_errno(-ENOMEM);
1872                 goto out;
1873         }
1874
1875         kref_init(&dlm_debug->d_refcnt);
1876         INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
1877         dlm_debug->d_locking_state = NULL;
1878 out:
1879         return dlm_debug;
1880 }
1881
1882 /* Access to this is arbitrated for us via seq_file->sem. */
1883 struct ocfs2_dlm_seq_priv {
1884         struct ocfs2_dlm_debug *p_dlm_debug;
1885         struct ocfs2_lock_res p_iter_res;
1886         struct ocfs2_lock_res p_tmp_res;
1887 };
1888
1889 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
1890                                                  struct ocfs2_dlm_seq_priv *priv)
1891 {
1892         struct ocfs2_lock_res *iter, *ret = NULL;
1893         struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
1894
1895         assert_spin_locked(&ocfs2_dlm_tracking_lock);
1896
1897         list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
1898                 /* discover the head of the list */
1899                 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
1900                         mlog(0, "End of list found, %p\n", ret);
1901                         break;
1902                 }
1903
1904                 /* We track our "dummy" iteration lockres' by a NULL
1905                  * l_ops field. */
1906                 if (iter->l_ops != NULL) {
1907                         ret = iter;
1908                         break;
1909                 }
1910         }
1911
1912         return ret;
1913 }
1914
1915 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
1916 {
1917         struct ocfs2_dlm_seq_priv *priv = m->private;
1918         struct ocfs2_lock_res *iter;
1919
1920         spin_lock(&ocfs2_dlm_tracking_lock);
1921         iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
1922         if (iter) {
1923                 /* Since lockres' have the lifetime of their container
1924                  * (which can be inodes, ocfs2_supers, etc) we want to
1925                  * copy this out to a temporary lockres while still
1926                  * under the spinlock. Obviously after this we can't
1927                  * trust any pointers on the copy returned, but that's
1928                  * ok as the information we want isn't typically held
1929                  * in them. */
1930                 priv->p_tmp_res = *iter;
1931                 iter = &priv->p_tmp_res;
1932         }
1933         spin_unlock(&ocfs2_dlm_tracking_lock);
1934
1935         return iter;
1936 }
1937
1938 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
1939 {
1940 }
1941
1942 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
1943 {
1944         struct ocfs2_dlm_seq_priv *priv = m->private;
1945         struct ocfs2_lock_res *iter = v;
1946         struct ocfs2_lock_res *dummy = &priv->p_iter_res;
1947
1948         spin_lock(&ocfs2_dlm_tracking_lock);
1949         iter = ocfs2_dlm_next_res(iter, priv);
1950         list_del_init(&dummy->l_debug_list);
1951         if (iter) {
1952                 list_add(&dummy->l_debug_list, &iter->l_debug_list);
1953                 priv->p_tmp_res = *iter;
1954                 iter = &priv->p_tmp_res;
1955         }
1956         spin_unlock(&ocfs2_dlm_tracking_lock);
1957
1958         return iter;
1959 }
1960
1961 /* So that debugfs.ocfs2 can determine which format is being used */
1962 #define OCFS2_DLM_DEBUG_STR_VERSION 1
1963 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
1964 {
1965         int i;
1966         char *lvb;
1967         struct ocfs2_lock_res *lockres = v;
1968
1969         if (!lockres)
1970                 return -EINVAL;
1971
1972         seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
1973
1974         if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
1975                 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
1976                            lockres->l_name,
1977                            (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
1978         else
1979                 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
1980
1981         seq_printf(m, "%d\t"
1982                    "0x%lx\t"
1983                    "0x%x\t"
1984                    "0x%x\t"
1985                    "%u\t"
1986                    "%u\t"
1987                    "%d\t"
1988                    "%d\t",
1989                    lockres->l_level,
1990                    lockres->l_flags,
1991                    lockres->l_action,
1992                    lockres->l_unlock_action,
1993                    lockres->l_ro_holders,
1994                    lockres->l_ex_holders,
1995                    lockres->l_requested,
1996                    lockres->l_blocking);
1997
1998         /* Dump the raw LVB */
1999         lvb = lockres->l_lksb.lvb;
2000         for(i = 0; i < DLM_LVB_LEN; i++)
2001                 seq_printf(m, "0x%x\t", lvb[i]);
2002
2003         /* End the line */
2004         seq_printf(m, "\n");
2005         return 0;
2006 }
2007
2008 static struct seq_operations ocfs2_dlm_seq_ops = {
2009         .start =        ocfs2_dlm_seq_start,
2010         .stop =         ocfs2_dlm_seq_stop,
2011         .next =         ocfs2_dlm_seq_next,
2012         .show =         ocfs2_dlm_seq_show,
2013 };
2014
2015 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2016 {
2017         struct seq_file *seq = (struct seq_file *) file->private_data;
2018         struct ocfs2_dlm_seq_priv *priv = seq->private;
2019         struct ocfs2_lock_res *res = &priv->p_iter_res;
2020
2021         ocfs2_remove_lockres_tracking(res);
2022         ocfs2_put_dlm_debug(priv->p_dlm_debug);
2023         return seq_release_private(inode, file);
2024 }
2025
2026 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2027 {
2028         int ret;
2029         struct ocfs2_dlm_seq_priv *priv;
2030         struct seq_file *seq;
2031         struct ocfs2_super *osb;
2032
2033         priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2034         if (!priv) {
2035                 ret = -ENOMEM;
2036                 mlog_errno(ret);
2037                 goto out;
2038         }
2039         osb = (struct ocfs2_super *) inode->u.generic_ip;
2040         ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2041         priv->p_dlm_debug = osb->osb_dlm_debug;
2042         INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2043
2044         ret = seq_open(file, &ocfs2_dlm_seq_ops);
2045         if (ret) {
2046                 kfree(priv);
2047                 mlog_errno(ret);
2048                 goto out;
2049         }
2050
2051         seq = (struct seq_file *) file->private_data;
2052         seq->private = priv;
2053
2054         ocfs2_add_lockres_tracking(&priv->p_iter_res,
2055                                    priv->p_dlm_debug);
2056
2057 out:
2058         return ret;
2059 }
2060
2061 static const struct file_operations ocfs2_dlm_debug_fops = {
2062         .open =         ocfs2_dlm_debug_open,
2063         .release =      ocfs2_dlm_debug_release,
2064         .read =         seq_read,
2065         .llseek =       seq_lseek,
2066 };
2067
2068 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2069 {
2070         int ret = 0;
2071         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2072
2073         dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2074                                                          S_IFREG|S_IRUSR,
2075                                                          osb->osb_debug_root,
2076                                                          osb,
2077                                                          &ocfs2_dlm_debug_fops);
2078         if (!dlm_debug->d_locking_state) {
2079                 ret = -EINVAL;
2080                 mlog(ML_ERROR,
2081                      "Unable to create locking state debugfs file.\n");
2082                 goto out;
2083         }
2084
2085         ocfs2_get_dlm_debug(dlm_debug);
2086 out:
2087         return ret;
2088 }
2089
2090 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2091 {
2092         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2093
2094         if (dlm_debug) {
2095                 debugfs_remove(dlm_debug->d_locking_state);
2096                 ocfs2_put_dlm_debug(dlm_debug);
2097         }
2098 }
2099
2100 int ocfs2_dlm_init(struct ocfs2_super *osb)
2101 {
2102         int status;
2103         u32 dlm_key;
2104         struct dlm_ctxt *dlm;
2105
2106         mlog_entry_void();
2107
2108         status = ocfs2_dlm_init_debug(osb);
2109         if (status < 0) {
2110                 mlog_errno(status);
2111                 goto bail;
2112         }
2113
2114         /* launch vote thread */
2115         osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
2116         if (IS_ERR(osb->vote_task)) {
2117                 status = PTR_ERR(osb->vote_task);
2118                 osb->vote_task = NULL;
2119                 mlog_errno(status);
2120                 goto bail;
2121         }
2122
2123         /* used by the dlm code to make message headers unique, each
2124          * node in this domain must agree on this. */
2125         dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2126
2127         /* for now, uuid == domain */
2128         dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2129         if (IS_ERR(dlm)) {
2130                 status = PTR_ERR(dlm);
2131                 mlog_errno(status);
2132                 goto bail;
2133         }
2134
2135         ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2136         ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2137
2138         dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2139
2140         osb->dlm = dlm;
2141
2142         status = 0;
2143 bail:
2144         if (status < 0) {
2145                 ocfs2_dlm_shutdown_debug(osb);
2146                 if (osb->vote_task)
2147                         kthread_stop(osb->vote_task);
2148         }
2149
2150         mlog_exit(status);
2151         return status;
2152 }
2153
2154 void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2155 {
2156         mlog_entry_void();
2157
2158         dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2159
2160         ocfs2_drop_osb_locks(osb);
2161
2162         if (osb->vote_task) {
2163                 kthread_stop(osb->vote_task);
2164                 osb->vote_task = NULL;
2165         }
2166
2167         ocfs2_lock_res_free(&osb->osb_super_lockres);
2168         ocfs2_lock_res_free(&osb->osb_rename_lockres);
2169
2170         dlm_unregister_domain(osb->dlm);
2171         osb->dlm = NULL;
2172
2173         ocfs2_dlm_shutdown_debug(osb);
2174
2175         mlog_exit_void();
2176 }
2177
2178 static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2179 {
2180         struct ocfs2_lock_res *lockres = opaque;
2181         unsigned long flags;
2182
2183         mlog_entry_void();
2184
2185         mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2186              lockres->l_unlock_action);
2187
2188         spin_lock_irqsave(&lockres->l_lock, flags);
2189         /* We tried to cancel a convert request, but it was already
2190          * granted. All we want to do here is clear our unlock
2191          * state. The wake_up call done at the bottom is redundant
2192          * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2193          * hurt anything anyway */
2194         if (status == DLM_CANCELGRANT &&
2195             lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2196                 mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2197
2198                 /* We don't clear the busy flag in this case as it
2199                  * should have been cleared by the ast which the dlm
2200                  * has called. */
2201                 goto complete_unlock;
2202         }
2203
2204         if (status != DLM_NORMAL) {
2205                 mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2206                      "unlock_action %d\n", status, lockres->l_name,
2207                      lockres->l_unlock_action);
2208                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2209                 return;
2210         }
2211
2212         switch(lockres->l_unlock_action) {
2213         case OCFS2_UNLOCK_CANCEL_CONVERT:
2214                 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2215                 lockres->l_action = OCFS2_AST_INVALID;
2216                 break;
2217         case OCFS2_UNLOCK_DROP_LOCK:
2218                 lockres->l_level = LKM_IVMODE;
2219                 break;
2220         default:
2221                 BUG();
2222         }
2223
2224         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2225 complete_unlock:
2226         lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2227         spin_unlock_irqrestore(&lockres->l_lock, flags);
2228
2229         wake_up(&lockres->l_event);
2230
2231         mlog_exit_void();
2232 }
2233
2234 typedef void (ocfs2_pre_drop_cb_t)(struct ocfs2_lock_res *, void *);
2235
2236 struct drop_lock_cb {
2237         ocfs2_pre_drop_cb_t     *drop_func;
2238         void                    *drop_data;
2239 };
2240
2241 static int ocfs2_drop_lock(struct ocfs2_super *osb,
2242                            struct ocfs2_lock_res *lockres,
2243                            struct drop_lock_cb *dcb)
2244 {
2245         enum dlm_status status;
2246         unsigned long flags;
2247         int lkm_flags = 0;
2248
2249         /* We didn't get anywhere near actually using this lockres. */
2250         if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2251                 goto out;
2252
2253         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2254                 lkm_flags |= LKM_VALBLK;
2255
2256         spin_lock_irqsave(&lockres->l_lock, flags);
2257
2258         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2259                         "lockres %s, flags 0x%lx\n",
2260                         lockres->l_name, lockres->l_flags);
2261
2262         while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2263                 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2264                      "%u, unlock_action = %u\n",
2265                      lockres->l_name, lockres->l_flags, lockres->l_action,
2266                      lockres->l_unlock_action);
2267
2268                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2269
2270                 /* XXX: Today we just wait on any busy
2271                  * locks... Perhaps we need to cancel converts in the
2272                  * future? */
2273                 ocfs2_wait_on_busy_lock(lockres);
2274
2275                 spin_lock_irqsave(&lockres->l_lock, flags);
2276         }
2277
2278         if (dcb)
2279                 dcb->drop_func(lockres, dcb->drop_data);
2280
2281         if (lockres->l_flags & OCFS2_LOCK_BUSY)
2282                 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2283                      lockres->l_name);
2284         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2285                 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2286
2287         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2288                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2289                 goto out;
2290         }
2291
2292         lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2293
2294         /* make sure we never get here while waiting for an ast to
2295          * fire. */
2296         BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2297
2298         /* is this necessary? */
2299         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2300         lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2301         spin_unlock_irqrestore(&lockres->l_lock, flags);
2302
2303         mlog(0, "lock %s\n", lockres->l_name);
2304
2305         status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
2306                            ocfs2_unlock_ast, lockres);
2307         if (status != DLM_NORMAL) {
2308                 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2309                 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2310                 dlm_print_one_lock(lockres->l_lksb.lockid);
2311                 BUG();
2312         }
2313         mlog(0, "lock %s, successfull return from dlmunlock\n",
2314              lockres->l_name);
2315
2316         ocfs2_wait_on_busy_lock(lockres);
2317 out:
2318         mlog_exit(0);
2319         return 0;
2320 }
2321
2322 /* Mark the lockres as being dropped. It will no longer be
2323  * queued if blocking, but we still may have to wait on it
2324  * being dequeued from the vote thread before we can consider
2325  * it safe to drop. 
2326  *
2327  * You can *not* attempt to call cluster_lock on this lockres anymore. */
2328 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2329 {
2330         int status;
2331         struct ocfs2_mask_waiter mw;
2332         unsigned long flags;
2333
2334         ocfs2_init_mask_waiter(&mw);
2335
2336         spin_lock_irqsave(&lockres->l_lock, flags);
2337         lockres->l_flags |= OCFS2_LOCK_FREEING;
2338         while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2339                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2340                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2341
2342                 mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2343
2344                 status = ocfs2_wait_for_mask(&mw);
2345                 if (status)
2346                         mlog_errno(status);
2347
2348                 spin_lock_irqsave(&lockres->l_lock, flags);
2349         }
2350         spin_unlock_irqrestore(&lockres->l_lock, flags);
2351 }
2352
2353 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2354                                struct ocfs2_lock_res *lockres)
2355 {
2356         int ret;
2357
2358         ocfs2_mark_lockres_freeing(lockres);
2359         ret = ocfs2_drop_lock(osb, lockres, NULL);
2360         if (ret)
2361                 mlog_errno(ret);
2362 }
2363
2364 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2365 {
2366         ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2367         ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2368 }
2369
2370 static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data)
2371 {
2372         struct inode *inode = data;
2373
2374         /* the metadata lock requires a bit more work as we have an
2375          * LVB to worry about. */
2376         if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2377             lockres->l_level == LKM_EXMODE &&
2378             !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2379                 __ocfs2_stuff_meta_lvb(inode);
2380 }
2381
2382 int ocfs2_drop_inode_locks(struct inode *inode)
2383 {
2384         int status, err;
2385         struct drop_lock_cb meta_dcb = { ocfs2_meta_pre_drop, inode, };
2386
2387         mlog_entry_void();
2388
2389         /* No need to call ocfs2_mark_lockres_freeing here -
2390          * ocfs2_clear_inode has done it for us. */
2391
2392         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2393                               &OCFS2_I(inode)->ip_data_lockres,
2394                               NULL);
2395         if (err < 0)
2396                 mlog_errno(err);
2397
2398         status = err;
2399
2400         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2401                               &OCFS2_I(inode)->ip_meta_lockres,
2402                               &meta_dcb);
2403         if (err < 0)
2404                 mlog_errno(err);
2405         if (err < 0 && !status)
2406                 status = err;
2407
2408         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2409                               &OCFS2_I(inode)->ip_rw_lockres,
2410                               NULL);
2411         if (err < 0)
2412                 mlog_errno(err);
2413         if (err < 0 && !status)
2414                 status = err;
2415
2416         mlog_exit(status);
2417         return status;
2418 }
2419
2420 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2421                                       int new_level)
2422 {
2423         assert_spin_locked(&lockres->l_lock);
2424
2425         BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2426
2427         if (lockres->l_level <= new_level) {
2428                 mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2429                      lockres->l_level, new_level);
2430                 BUG();
2431         }
2432
2433         mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2434              lockres->l_name, new_level, lockres->l_blocking);
2435
2436         lockres->l_action = OCFS2_AST_DOWNCONVERT;
2437         lockres->l_requested = new_level;
2438         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2439 }
2440
2441 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2442                                   struct ocfs2_lock_res *lockres,
2443                                   int new_level,
2444                                   int lvb)
2445 {
2446         int ret, dlm_flags = LKM_CONVERT;
2447         enum dlm_status status;
2448
2449         mlog_entry_void();
2450
2451         if (lvb)
2452                 dlm_flags |= LKM_VALBLK;
2453
2454         status = dlmlock(osb->dlm,
2455                          new_level,
2456                          &lockres->l_lksb,
2457                          dlm_flags,
2458                          lockres->l_name,
2459                          OCFS2_LOCK_ID_MAX_LEN - 1,
2460                          ocfs2_locking_ast,
2461                          lockres,
2462                          ocfs2_blocking_ast);
2463         if (status != DLM_NORMAL) {
2464                 ocfs2_log_dlm_error("dlmlock", status, lockres);
2465                 ret = -EINVAL;
2466                 ocfs2_recover_from_dlm_error(lockres, 1);
2467                 goto bail;
2468         }
2469
2470         ret = 0;
2471 bail:
2472         mlog_exit(ret);
2473         return ret;
2474 }
2475
2476 /* returns 1 when the caller should unlock and call dlmunlock */
2477 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2478                                         struct ocfs2_lock_res *lockres)
2479 {
2480         assert_spin_locked(&lockres->l_lock);
2481
2482         mlog_entry_void();
2483         mlog(0, "lock %s\n", lockres->l_name);
2484
2485         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2486                 /* If we're already trying to cancel a lock conversion
2487                  * then just drop the spinlock and allow the caller to
2488                  * requeue this lock. */
2489
2490                 mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2491                 return 0;
2492         }
2493
2494         /* were we in a convert when we got the bast fire? */
2495         BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2496                lockres->l_action != OCFS2_AST_DOWNCONVERT);
2497         /* set things up for the unlockast to know to just
2498          * clear out the ast_action and unset busy, etc. */
2499         lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2500
2501         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2502                         "lock %s, invalid flags: 0x%lx\n",
2503                         lockres->l_name, lockres->l_flags);
2504
2505         return 1;
2506 }
2507
2508 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2509                                 struct ocfs2_lock_res *lockres)
2510 {
2511         int ret;
2512         enum dlm_status status;
2513
2514         mlog_entry_void();
2515         mlog(0, "lock %s\n", lockres->l_name);
2516
2517         ret = 0;
2518         status = dlmunlock(osb->dlm,
2519                            &lockres->l_lksb,
2520                            LKM_CANCEL,
2521                            ocfs2_unlock_ast,
2522                            lockres);
2523         if (status != DLM_NORMAL) {
2524                 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2525                 ret = -EINVAL;
2526                 ocfs2_recover_from_dlm_error(lockres, 0);
2527         }
2528
2529         mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2530
2531         mlog_exit(ret);
2532         return ret;
2533 }
2534
2535 static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
2536                                                   struct ocfs2_lock_res *lockres,
2537                                                   int new_level)
2538 {
2539         int ret;
2540
2541         mlog_entry_void();
2542
2543         BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2544
2545         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2546                 ret = 0;
2547                 mlog(0, "lockres %s currently being refreshed -- backing "
2548                      "off!\n", lockres->l_name);
2549         } else if (new_level == LKM_PRMODE)
2550                 ret = !lockres->l_ex_holders &&
2551                         ocfs2_inode_fully_checkpointed(inode);
2552         else /* Must be NLMODE we're converting to. */
2553                 ret = !lockres->l_ro_holders && !lockres->l_ex_holders &&
2554                         ocfs2_inode_fully_checkpointed(inode);
2555
2556         mlog_exit(ret);
2557         return ret;
2558 }
2559
2560 static int ocfs2_do_unblock_meta(struct inode *inode,
2561                                  int *requeue)
2562 {
2563         int new_level;
2564         int set_lvb = 0;
2565         int ret = 0;
2566         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
2567         unsigned long flags;
2568
2569         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2570
2571         mlog_entry_void();
2572
2573         spin_lock_irqsave(&lockres->l_lock, flags);
2574
2575         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2576
2577         mlog(0, "l_level=%d, l_blocking=%d\n", lockres->l_level,
2578              lockres->l_blocking);
2579
2580         BUG_ON(lockres->l_level != LKM_EXMODE &&
2581                lockres->l_level != LKM_PRMODE);
2582
2583         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2584                 *requeue = 1;
2585                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2586                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2587                 if (ret) {
2588                         ret = ocfs2_cancel_convert(osb, lockres);
2589                         if (ret < 0)
2590                                 mlog_errno(ret);
2591                 }
2592                 goto leave;
2593         }
2594
2595         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2596
2597         mlog(0, "l_level=%d, l_blocking=%d, new_level=%d\n",
2598              lockres->l_level, lockres->l_blocking, new_level);
2599
2600         if (ocfs2_can_downconvert_meta_lock(inode, lockres, new_level)) {
2601                 if (lockres->l_level == LKM_EXMODE)
2602                         set_lvb = 1;
2603
2604                 /* If the lock hasn't been refreshed yet (rare), then
2605                  * our memory inode values are old and we skip
2606                  * stuffing the lvb. There's no need to actually clear
2607                  * out the lvb here as it's value is still valid. */
2608                 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2609                         if (set_lvb)
2610                                 __ocfs2_stuff_meta_lvb(inode);
2611                 } else
2612                         mlog(0, "lockres %s: downconverting stale lock!\n",
2613                              lockres->l_name);
2614
2615                 mlog(0, "calling ocfs2_downconvert_lock with l_level=%d, "
2616                      "l_blocking=%d, new_level=%d\n",
2617                      lockres->l_level, lockres->l_blocking, new_level);
2618
2619                 ocfs2_prepare_downconvert(lockres, new_level);
2620                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2621                 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2622                 goto leave;
2623         }
2624         if (!ocfs2_inode_fully_checkpointed(inode))
2625                 ocfs2_start_checkpoint(osb);
2626
2627         *requeue = 1;
2628         spin_unlock_irqrestore(&lockres->l_lock, flags);
2629         ret = 0;
2630 leave:
2631         mlog_exit(ret);
2632         return ret;
2633 }
2634
2635 static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
2636                                       struct ocfs2_lock_res *lockres,
2637                                       struct ocfs2_unblock_ctl *ctl,
2638                                       ocfs2_convert_worker_t *worker)
2639 {
2640         unsigned long flags;
2641         int blocking;
2642         int new_level;
2643         int ret = 0;
2644         int set_lvb = 0;
2645
2646         mlog_entry_void();
2647
2648         spin_lock_irqsave(&lockres->l_lock, flags);
2649
2650         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2651
2652 recheck:
2653         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2654                 ctl->requeue = 1;
2655                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2656                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2657                 if (ret) {
2658                         ret = ocfs2_cancel_convert(osb, lockres);
2659                         if (ret < 0)
2660                                 mlog_errno(ret);
2661                 }
2662                 goto leave;
2663         }
2664
2665         /* if we're blocking an exclusive and we have *any* holders,
2666          * then requeue. */
2667         if ((lockres->l_blocking == LKM_EXMODE)
2668             && (lockres->l_ex_holders || lockres->l_ro_holders))
2669                 goto leave_requeue;
2670
2671         /* If it's a PR we're blocking, then only
2672          * requeue if we've got any EX holders */
2673         if (lockres->l_blocking == LKM_PRMODE &&
2674             lockres->l_ex_holders)
2675                 goto leave_requeue;
2676
2677         /*
2678          * Can we get a lock in this state if the holder counts are
2679          * zero? The meta data unblock code used to check this.
2680          */
2681         if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
2682             && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
2683                 goto leave_requeue;
2684
2685         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2686
2687         if (lockres->l_ops->check_downconvert
2688             && !lockres->l_ops->check_downconvert(lockres, new_level))
2689                 goto leave_requeue;
2690
2691         /* If we get here, then we know that there are no more
2692          * incompatible holders (and anyone asking for an incompatible
2693          * lock is blocked). We can now downconvert the lock */
2694         if (!worker)
2695                 goto downconvert;
2696
2697         /* Some lockres types want to do a bit of work before
2698          * downconverting a lock. Allow that here. The worker function
2699          * may sleep, so we save off a copy of what we're blocking as
2700          * it may change while we're not holding the spin lock. */
2701         blocking = lockres->l_blocking;
2702         spin_unlock_irqrestore(&lockres->l_lock, flags);
2703
2704         ctl->unblock_action = worker(lockres, blocking);
2705
2706         if (ctl->unblock_action == UNBLOCK_STOP_POST)
2707                 goto leave;
2708
2709         spin_lock_irqsave(&lockres->l_lock, flags);
2710         if (blocking != lockres->l_blocking) {
2711                 /* If this changed underneath us, then we can't drop
2712                  * it just yet. */
2713                 goto recheck;
2714         }
2715
2716 downconvert:
2717         ctl->requeue = 0;
2718
2719         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2720                 if (lockres->l_level == LKM_EXMODE)
2721                         set_lvb = 1;
2722
2723                 /*
2724                  * We only set the lvb if the lock has been fully
2725                  * refreshed - otherwise we risk setting stale
2726                  * data. Otherwise, there's no need to actually clear
2727                  * out the lvb here as it's value is still valid.
2728                  */
2729                 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2730                         lockres->l_ops->set_lvb(lockres);
2731         }
2732
2733         ocfs2_prepare_downconvert(lockres, new_level);
2734         spin_unlock_irqrestore(&lockres->l_lock, flags);
2735         ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2736 leave:
2737         mlog_exit(ret);
2738         return ret;
2739
2740 leave_requeue:
2741         spin_unlock_irqrestore(&lockres->l_lock, flags);
2742         ctl->requeue = 1;
2743
2744         mlog_exit(0);
2745         return 0;
2746 }
2747
2748 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2749                                      int blocking)
2750 {
2751         struct inode *inode;
2752         struct address_space *mapping;
2753
2754         inode = ocfs2_lock_res_inode(lockres);
2755         mapping = inode->i_mapping;
2756
2757         if (filemap_fdatawrite(mapping)) {
2758                 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
2759                      (unsigned long long)OCFS2_I(inode)->ip_blkno);
2760         }
2761         sync_mapping_buffers(mapping);
2762         if (blocking == LKM_EXMODE) {
2763                 truncate_inode_pages(mapping, 0);
2764                 unmap_mapping_range(mapping, 0, 0, 0);
2765         } else {
2766                 /* We only need to wait on the I/O if we're not also
2767                  * truncating pages because truncate_inode_pages waits
2768                  * for us above. We don't truncate pages if we're
2769                  * blocking anything < EXMODE because we want to keep
2770                  * them around in that case. */
2771                 filemap_fdatawait(mapping);
2772         }
2773
2774         return UNBLOCK_CONTINUE;
2775 }
2776
2777 int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
2778                        struct ocfs2_unblock_ctl *ctl)
2779 {
2780         int status;
2781         struct inode *inode;
2782         struct ocfs2_super *osb;
2783
2784         mlog_entry_void();
2785
2786         inode = ocfs2_lock_res_inode(lockres);
2787         osb = OCFS2_SB(inode->i_sb);
2788
2789         mlog(0, "unblock inode %llu\n",
2790              (unsigned long long)OCFS2_I(inode)->ip_blkno);
2791
2792         status = ocfs2_generic_unblock_lock(osb, lockres, ctl,
2793                                             ocfs2_data_convert_worker);
2794         if (status < 0)
2795                 mlog_errno(status);
2796
2797         mlog(0, "inode %llu, requeue = %d\n",
2798              (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2799
2800         mlog_exit(status);
2801         return status;
2802 }
2803
2804 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
2805                                     struct ocfs2_unblock_ctl *ctl)
2806 {
2807         int status;
2808         struct inode *inode;
2809
2810         mlog_entry_void();
2811
2812         mlog(0, "Unblock lockres %s\n", lockres->l_name);
2813
2814         inode  = ocfs2_lock_res_inode(lockres);
2815
2816         status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
2817                                             lockres, ctl, NULL);
2818         if (status < 0)
2819                 mlog_errno(status);
2820
2821         mlog_exit(status);
2822         return status;
2823 }
2824
2825 static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
2826                               struct ocfs2_unblock_ctl *ctl)
2827 {
2828         int status;
2829         struct inode *inode;
2830
2831         mlog_entry_void();
2832
2833         inode = ocfs2_lock_res_inode(lockres);
2834
2835         mlog(0, "unblock inode %llu\n",
2836              (unsigned long long)OCFS2_I(inode)->ip_blkno);
2837
2838         status = ocfs2_do_unblock_meta(inode, &ctl->requeue);
2839         if (status < 0)
2840                 mlog_errno(status);
2841
2842         mlog(0, "inode %llu, requeue = %d\n",
2843              (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2844
2845         mlog_exit(status);
2846         return status;
2847 }
2848
2849 /*
2850  * Does the final reference drop on our dentry lock. Right now this
2851  * happens in the vote thread, but we could choose to simplify the
2852  * dlmglue API and push these off to the ocfs2_wq in the future.
2853  */
2854 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2855                                      struct ocfs2_lock_res *lockres)
2856 {
2857         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2858         ocfs2_dentry_lock_put(osb, dl);
2859 }
2860
2861 /*
2862  * d_delete() matching dentries before the lock downconvert.
2863  *
2864  * At this point, any process waiting to destroy the
2865  * dentry_lock due to last ref count is stopped by the
2866  * OCFS2_LOCK_QUEUED flag.
2867  *
2868  * We have two potential problems
2869  *
2870  * 1) If we do the last reference drop on our dentry_lock (via dput)
2871  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
2872  *    the downconvert to finish. Instead we take an elevated
2873  *    reference and push the drop until after we've completed our
2874  *    unblock processing.
2875  *
2876  * 2) There might be another process with a final reference,
2877  *    waiting on us to finish processing. If this is the case, we
2878  *    detect it and exit out - there's no more dentries anyway.
2879  */
2880 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2881                                        int blocking)
2882 {
2883         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2884         struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
2885         struct dentry *dentry;
2886         unsigned long flags;
2887         int extra_ref = 0;
2888
2889         /*
2890          * This node is blocking another node from getting a read
2891          * lock. This happens when we've renamed within a
2892          * directory. We've forced the other nodes to d_delete(), but
2893          * we never actually dropped our lock because it's still
2894          * valid. The downconvert code will retain a PR for this node,
2895          * so there's no further work to do.
2896          */
2897         if (blocking == LKM_PRMODE)
2898                 return UNBLOCK_CONTINUE;
2899
2900         /*
2901          * Mark this inode as potentially orphaned. The code in
2902          * ocfs2_delete_inode() will figure out whether it actually
2903          * needs to be freed or not.
2904          */
2905         spin_lock(&oi->ip_lock);
2906         oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2907         spin_unlock(&oi->ip_lock);
2908
2909         /*
2910          * Yuck. We need to make sure however that the check of
2911          * OCFS2_LOCK_FREEING and the extra reference are atomic with
2912          * respect to a reference decrement or the setting of that
2913          * flag.
2914          */
2915         spin_lock_irqsave(&lockres->l_lock, flags);
2916         spin_lock(&dentry_attach_lock);
2917         if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
2918             && dl->dl_count) {
2919                 dl->dl_count++;
2920                 extra_ref = 1;
2921         }
2922         spin_unlock(&dentry_attach_lock);
2923         spin_unlock_irqrestore(&lockres->l_lock, flags);
2924
2925         mlog(0, "extra_ref = %d\n", extra_ref);
2926
2927         /*
2928          * We have a process waiting on us in ocfs2_dentry_iput(),
2929          * which means we can't have any more outstanding
2930          * aliases. There's no need to do any more work.
2931          */
2932         if (!extra_ref)
2933                 return UNBLOCK_CONTINUE;
2934
2935         spin_lock(&dentry_attach_lock);
2936         while (1) {
2937                 dentry = ocfs2_find_local_alias(dl->dl_inode,
2938                                                 dl->dl_parent_blkno, 1);
2939                 if (!dentry)
2940                         break;
2941                 spin_unlock(&dentry_attach_lock);
2942
2943                 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
2944                      dentry->d_name.name);
2945
2946                 /*
2947                  * The following dcache calls may do an
2948                  * iput(). Normally we don't want that from the
2949                  * downconverting thread, but in this case it's ok
2950                  * because the requesting node already has an
2951                  * exclusive lock on the inode, so it can't be queued
2952                  * for a downconvert.
2953                  */
2954                 d_delete(dentry);
2955                 dput(dentry);
2956
2957                 spin_lock(&dentry_attach_lock);
2958         }
2959         spin_unlock(&dentry_attach_lock);
2960
2961         /*
2962          * If we are the last holder of this dentry lock, there is no
2963          * reason to downconvert so skip straight to the unlock.
2964          */
2965         if (dl->dl_count == 1)
2966                 return UNBLOCK_STOP_POST;
2967
2968         return UNBLOCK_CONTINUE_POST;
2969 }
2970
2971 static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
2972                                      struct ocfs2_unblock_ctl *ctl)
2973 {
2974         int ret;
2975         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2976         struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
2977
2978         mlog(0, "unblock dentry lock: %llu\n",
2979              (unsigned long long)OCFS2_I(dl->dl_inode)->ip_blkno);
2980
2981         ret = ocfs2_generic_unblock_lock(osb,
2982                                          lockres,
2983                                          ctl,
2984                                          ocfs2_dentry_convert_worker);
2985         if (ret < 0)
2986                 mlog_errno(ret);
2987
2988         mlog(0, "requeue = %d, post = %d\n", ctl->requeue, ctl->unblock_action);
2989
2990         return ret;
2991 }
2992
2993 /* Generic unblock function for any lockres whose private data is an
2994  * ocfs2_super pointer. */
2995 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
2996                                   struct ocfs2_unblock_ctl *ctl)
2997 {
2998         int status;
2999         struct ocfs2_super *osb;
3000
3001         mlog_entry_void();
3002
3003         mlog(0, "Unblock lockres %s\n", lockres->l_name);
3004
3005         osb = ocfs2_get_lockres_osb(lockres);
3006
3007         status = ocfs2_generic_unblock_lock(osb,
3008                                             lockres,
3009                                             ctl,
3010                                             NULL);
3011         if (status < 0)
3012                 mlog_errno(status);
3013
3014         mlog_exit(status);
3015         return status;
3016 }
3017
3018 void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3019                                 struct ocfs2_lock_res *lockres)
3020 {
3021         int status;
3022         struct ocfs2_unblock_ctl ctl = {0, 0,};
3023         unsigned long flags;
3024
3025         /* Our reference to the lockres in this function can be
3026          * considered valid until we remove the OCFS2_LOCK_QUEUED
3027          * flag. */
3028
3029         mlog_entry_void();
3030
3031         BUG_ON(!lockres);
3032         BUG_ON(!lockres->l_ops);
3033         BUG_ON(!lockres->l_ops->unblock);
3034
3035         mlog(0, "lockres %s blocked.\n", lockres->l_name);
3036
3037         /* Detect whether a lock has been marked as going away while
3038          * the vote thread was processing other things. A lock can
3039          * still be marked with OCFS2_LOCK_FREEING after this check,
3040          * but short circuiting here will still save us some
3041          * performance. */
3042         spin_lock_irqsave(&lockres->l_lock, flags);
3043         if (lockres->l_flags & OCFS2_LOCK_FREEING)
3044                 goto unqueue;
3045         spin_unlock_irqrestore(&lockres->l_lock, flags);
3046
3047         status = lockres->l_ops->unblock(lockres, &ctl);
3048         if (status < 0)
3049                 mlog_errno(status);
3050
3051         spin_lock_irqsave(&lockres->l_lock, flags);
3052 unqueue:
3053         if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3054                 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3055         } else
3056                 ocfs2_schedule_blocked_lock(osb, lockres);
3057
3058         mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3059              ctl.requeue ? "yes" : "no");
3060         spin_unlock_irqrestore(&lockres->l_lock, flags);
3061
3062         if (ctl.unblock_action != UNBLOCK_CONTINUE
3063             && lockres->l_ops->post_unlock)
3064                 lockres->l_ops->post_unlock(osb, lockres);
3065
3066         mlog_exit_void();
3067 }
3068
3069 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3070                                         struct ocfs2_lock_res *lockres)
3071 {
3072         mlog_entry_void();
3073
3074         assert_spin_locked(&lockres->l_lock);
3075
3076         if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3077                 /* Do not schedule a lock for downconvert when it's on
3078                  * the way to destruction - any nodes wanting access
3079                  * to the resource will get it soon. */
3080                 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3081                      lockres->l_name, lockres->l_flags);
3082                 return;
3083         }
3084
3085         lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3086
3087         spin_lock(&osb->vote_task_lock);
3088         if (list_empty(&lockres->l_blocked_list)) {
3089                 list_add_tail(&lockres->l_blocked_list,
3090                               &osb->blocked_lock_list);
3091                 osb->blocked_lock_count++;
3092         }
3093         spin_unlock(&osb->vote_task_lock);
3094
3095         mlog_exit_void();
3096 }
3097
3098 /* This aids in debugging situations where a bad LVB might be involved. */
3099 void ocfs2_dump_meta_lvb_info(u64 level,
3100                               const char *function,
3101                               unsigned int line,
3102                               struct ocfs2_lock_res *lockres)
3103 {
3104         struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
3105
3106         mlog(level, "LVB information for %s (called from %s:%u):\n",
3107              lockres->l_name, function, line);
3108         mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
3109              lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
3110              be32_to_cpu(lvb->lvb_igeneration));
3111         mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
3112              (unsigned long long)be64_to_cpu(lvb->lvb_isize),
3113              be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
3114              be16_to_cpu(lvb->lvb_imode));
3115         mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
3116              "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
3117              (long long)be64_to_cpu(lvb->lvb_iatime_packed),
3118              (long long)be64_to_cpu(lvb->lvb_ictime_packed),
3119              (long long)be64_to_cpu(lvb->lvb_imtime_packed),
3120              be32_to_cpu(lvb->lvb_iattr));
3121 }