Merge branch 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mfashe...
authorLinus Torvalds <torvalds@g5.osdl.org>
Sun, 24 Sep 2006 22:28:50 +0000 (15:28 -0700)
committerLinus Torvalds <torvalds@g5.osdl.org>
Sun, 24 Sep 2006 22:28:50 +0000 (15:28 -0700)
* 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mfasheh/ocfs2: (28 commits)
  ocfs2: Teach ocfs2_drop_lock() to use ->set_lvb() callback
  ocfs2: Remove ->unblock lockres operation
  ocfs2: move downconvert worker to lockres ops
  ocfs2: Remove unused dlmglue functions
  ocfs2: Have the metadata lock use generic dlmglue functions
  ocfs2: Add ->set_lvb callback in dlmglue
  ocfs2: Add ->check_downconvert callback in dlmglue
  ocfs2: Check for refreshing locks in generic unblock function
  ocfs2: don't unconditionally pass LVB flags
  ocfs2: combine inode and generic blocking AST functions
  ocfs2: Add ->get_osb() dlmglue locking operation
  ocfs2: remove ->unlock_ast() callback from ocfs2_lock_res_ops
  ocfs2: combine inode and generic AST functions
  ocfs2: Clean up lock resource refresh flags
  ocfs2: Remove i_generation from inode lock names
  ocfs2: Encode i_generation in the meta data lvb
  ocfs2: Free up some space in the lvb
  ocfs2: Remove special casing for inode creation in ocfs2_dentry_attach_lock()
  ocfs2: manually d_move() during ocfs2_rename()
  [PATCH] Allow file systems to manually d_move() inside of ->rename()
  ...

27 files changed:
fs/namei.c
fs/nfs/dir.c
fs/nfs/super.c
fs/ocfs2/cluster/tcp_internal.h
fs/ocfs2/dcache.c
fs/ocfs2/dcache.h
fs/ocfs2/dlm/dlmapi.h
fs/ocfs2/dlm/dlmast.c
fs/ocfs2/dlm/dlmcommon.h
fs/ocfs2/dlm/dlmlock.c
fs/ocfs2/dlm/dlmmaster.c
fs/ocfs2/dlm/dlmrecovery.c
fs/ocfs2/dlm/userdlm.c
fs/ocfs2/dlm/userdlm.h
fs/ocfs2/dlmglue.c
fs/ocfs2/dlmglue.h
fs/ocfs2/export.c
fs/ocfs2/inode.c
fs/ocfs2/inode.h
fs/ocfs2/journal.c
fs/ocfs2/namei.c
fs/ocfs2/ocfs2_lockid.h
fs/ocfs2/super.c
fs/ocfs2/sysfile.c
fs/ocfs2/vote.c
fs/ocfs2/vote.h
include/linux/fs.h

index 432d6bc..6b591c0 100644 (file)
@@ -2370,7 +2370,8 @@ static int vfs_rename_dir(struct inode *old_dir, struct dentry *old_dentry,
                dput(new_dentry);
        }
        if (!error)
-               d_move(old_dentry,new_dentry);
+               if (!(old_dir->i_sb->s_type->fs_flags & FS_RENAME_DOES_D_MOVE))
+                       d_move(old_dentry,new_dentry);
        return error;
 }
 
@@ -2393,8 +2394,7 @@ static int vfs_rename_other(struct inode *old_dir, struct dentry *old_dentry,
        else
                error = old_dir->i_op->rename(old_dir, old_dentry, new_dir, new_dentry);
        if (!error) {
-               /* The following d_move() should become unconditional */
-               if (!(old_dir->i_sb->s_type->fs_flags & FS_ODD_RENAME))
+               if (!(old_dir->i_sb->s_type->fs_flags & FS_RENAME_DOES_D_MOVE))
                        d_move(old_dentry, new_dentry);
        }
        if (target)
index 3419c2d..7432f1a 100644 (file)
@@ -1669,8 +1669,7 @@ out:
        if (rehash)
                d_rehash(rehash);
        if (!error) {
-               if (!S_ISDIR(old_inode->i_mode))
-                       d_move(old_dentry, new_dentry);
+               d_move(old_dentry, new_dentry);
                nfs_renew_times(new_dentry);
                nfs_set_verifier(new_dentry, nfs_save_change_attribute(new_dir));
        }
index b99113b..e8d4003 100644 (file)
@@ -71,7 +71,7 @@ static struct file_system_type nfs_fs_type = {
        .name           = "nfs",
        .get_sb         = nfs_get_sb,
        .kill_sb        = nfs_kill_super,
-       .fs_flags       = FS_ODD_RENAME|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
+       .fs_flags       = FS_RENAME_DOES_D_MOVE|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
 };
 
 struct file_system_type nfs_xdev_fs_type = {
@@ -79,7 +79,7 @@ struct file_system_type nfs_xdev_fs_type = {
        .name           = "nfs",
        .get_sb         = nfs_xdev_get_sb,
        .kill_sb        = nfs_kill_super,
-       .fs_flags       = FS_ODD_RENAME|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
+       .fs_flags       = FS_RENAME_DOES_D_MOVE|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
 };
 
 static struct super_operations nfs_sops = {
@@ -107,7 +107,7 @@ static struct file_system_type nfs4_fs_type = {
        .name           = "nfs4",
        .get_sb         = nfs4_get_sb,
        .kill_sb        = nfs4_kill_super,
-       .fs_flags       = FS_ODD_RENAME|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
+       .fs_flags       = FS_RENAME_DOES_D_MOVE|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
 };
 
 struct file_system_type nfs4_xdev_fs_type = {
@@ -115,7 +115,7 @@ struct file_system_type nfs4_xdev_fs_type = {
        .name           = "nfs4",
        .get_sb         = nfs4_xdev_get_sb,
        .kill_sb        = nfs4_kill_super,
-       .fs_flags       = FS_ODD_RENAME|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
+       .fs_flags       = FS_RENAME_DOES_D_MOVE|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
 };
 
 struct file_system_type nfs4_referral_fs_type = {
@@ -123,7 +123,7 @@ struct file_system_type nfs4_referral_fs_type = {
        .name           = "nfs4",
        .get_sb         = nfs4_referral_get_sb,
        .kill_sb        = nfs4_kill_super,
-       .fs_flags       = FS_ODD_RENAME|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
+       .fs_flags       = FS_RENAME_DOES_D_MOVE|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
 };
 
 static struct super_operations nfs4_sops = {
index ff9e2e2..4b46aac 100644 (file)
  * locking semantics of the file system using the protocol.  It should 
  * be somewhere else, I'm sure, but right now it isn't.
  *
+ * New in version 4:
+ *     - Remove i_generation from lock names for better stat performance.
+ *
+ * New in version 3:
+ *     - Replace dentry votes with a cluster lock
+ *
  * New in version 2:
  *     - full 64 bit i_size in the metadata lock lvbs
  *     - introduction of "rw" lock and pushing meta/data locking down
  */
-#define O2NET_PROTOCOL_VERSION 2ULL
+#define O2NET_PROTOCOL_VERSION 4ULL
 struct o2net_handshake {
        __be64  protocol_version;
        __be64  connector_id;
index 1a01380..014e739 100644 (file)
 
 #include "alloc.h"
 #include "dcache.h"
+#include "dlmglue.h"
 #include "file.h"
 #include "inode.h"
 
+
 static int ocfs2_dentry_revalidate(struct dentry *dentry,
                                   struct nameidata *nd)
 {
        struct inode *inode = dentry->d_inode;
        int ret = 0;    /* if all else fails, just return false */
-       struct ocfs2_super *osb;
+       struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
 
        mlog_entry("(0x%p, '%.*s')\n", dentry,
                   dentry->d_name.len, dentry->d_name.name);
@@ -55,28 +57,31 @@ static int ocfs2_dentry_revalidate(struct dentry *dentry,
                goto bail;
        }
 
-       osb = OCFS2_SB(inode->i_sb);
-
        BUG_ON(!osb);
 
-       if (inode != osb->root_inode) {
-               spin_lock(&OCFS2_I(inode)->ip_lock);
-               /* did we or someone else delete this inode? */
-               if (OCFS2_I(inode)->ip_flags & OCFS2_INODE_DELETED) {
-                       spin_unlock(&OCFS2_I(inode)->ip_lock);
-                       mlog(0, "inode (%llu) deleted, returning false\n",
-                            (unsigned long long)OCFS2_I(inode)->ip_blkno);
-                       goto bail;
-               }
+       if (inode == osb->root_inode || is_bad_inode(inode))
+               goto bail;
+
+       spin_lock(&OCFS2_I(inode)->ip_lock);
+       /* did we or someone else delete this inode? */
+       if (OCFS2_I(inode)->ip_flags & OCFS2_INODE_DELETED) {
                spin_unlock(&OCFS2_I(inode)->ip_lock);
+               mlog(0, "inode (%llu) deleted, returning false\n",
+                    (unsigned long long)OCFS2_I(inode)->ip_blkno);
+               goto bail;
+       }
+       spin_unlock(&OCFS2_I(inode)->ip_lock);
 
-               if (!inode->i_nlink) {
-                       mlog(0, "Inode %llu orphaned, returning false "
-                            "dir = %d\n",
-                            (unsigned long long)OCFS2_I(inode)->ip_blkno,
-                            S_ISDIR(inode->i_mode));
-                       goto bail;
-               }
+       /*
+        * We don't need a cluster lock to test this because once an
+        * inode nlink hits zero, it never goes back.
+        */
+       if (inode->i_nlink == 0) {
+               mlog(0, "Inode %llu orphaned, returning false "
+                    "dir = %d\n",
+                    (unsigned long long)OCFS2_I(inode)->ip_blkno,
+                    S_ISDIR(inode->i_mode));
+               goto bail;
        }
 
        ret = 1;
@@ -87,6 +92,322 @@ bail:
        return ret;
 }
 
+static int ocfs2_match_dentry(struct dentry *dentry,
+                             u64 parent_blkno,
+                             int skip_unhashed)
+{
+       struct inode *parent;
+
+       /*
+        * ocfs2_lookup() does a d_splice_alias() _before_ attaching
+        * to the lock data, so we skip those here, otherwise
+        * ocfs2_dentry_attach_lock() will get its original dentry
+        * back.
+        */
+       if (!dentry->d_fsdata)
+               return 0;
+
+       if (!dentry->d_parent)
+               return 0;
+
+       if (skip_unhashed && d_unhashed(dentry))
+               return 0;
+
+       parent = dentry->d_parent->d_inode;
+       /* Negative parent dentry? */
+       if (!parent)
+               return 0;
+
+       /* Name is in a different directory. */
+       if (OCFS2_I(parent)->ip_blkno != parent_blkno)
+               return 0;
+
+       return 1;
+}
+
+/*
+ * Walk the inode alias list, and find a dentry which has a given
+ * parent. ocfs2_dentry_attach_lock() wants to find _any_ alias as it
+ * is looking for a dentry_lock reference. The vote thread is looking
+ * to unhash aliases, so we allow it to skip any that already have
+ * that property.
+ */
+struct dentry *ocfs2_find_local_alias(struct inode *inode,
+                                     u64 parent_blkno,
+                                     int skip_unhashed)
+{
+       struct list_head *p;
+       struct dentry *dentry = NULL;
+
+       spin_lock(&dcache_lock);
+
+       list_for_each(p, &inode->i_dentry) {
+               dentry = list_entry(p, struct dentry, d_alias);
+
+               if (ocfs2_match_dentry(dentry, parent_blkno, skip_unhashed)) {
+                       mlog(0, "dentry found: %.*s\n",
+                            dentry->d_name.len, dentry->d_name.name);
+
+                       dget_locked(dentry);
+                       break;
+               }
+
+               dentry = NULL;
+       }
+
+       spin_unlock(&dcache_lock);
+
+       return dentry;
+}
+
+DEFINE_SPINLOCK(dentry_attach_lock);
+
+/*
+ * Attach this dentry to a cluster lock.
+ *
+ * Dentry locks cover all links in a given directory to a particular
+ * inode. We do this so that ocfs2 can build a lock name which all
+ * nodes in the cluster can agree on at all times. Shoving full names
+ * in the cluster lock won't work due to size restrictions. Covering
+ * links inside of a directory is a good compromise because it still
+ * allows us to use the parent directory lock to synchronize
+ * operations.
+ *
+ * Call this function with the parent dir semaphore and the parent dir
+ * cluster lock held.
+ *
+ * The dir semaphore will protect us from having to worry about
+ * concurrent processes on our node trying to attach a lock at the
+ * same time.
+ *
+ * The dir cluster lock (held at either PR or EX mode) protects us
+ * from unlink and rename on other nodes.
+ *
+ * A dput() can happen asynchronously due to pruning, so we cover
+ * attaching and detaching the dentry lock with a
+ * dentry_attach_lock.
+ *
+ * A node which has done lookup on a name retains a protected read
+ * lock until final dput. If the user requests and unlink or rename,
+ * the protected read is upgraded to an exclusive lock. Other nodes
+ * who have seen the dentry will then be informed that they need to
+ * downgrade their lock, which will involve d_delete on the
+ * dentry. This happens in ocfs2_dentry_convert_worker().
+ */
+int ocfs2_dentry_attach_lock(struct dentry *dentry,
+                            struct inode *inode,
+                            u64 parent_blkno)
+{
+       int ret;
+       struct dentry *alias;
+       struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
+
+       mlog(0, "Attach \"%.*s\", parent %llu, fsdata: %p\n",
+            dentry->d_name.len, dentry->d_name.name,
+            (unsigned long long)parent_blkno, dl);
+
+       /*
+        * Negative dentry. We ignore these for now.
+        *
+        * XXX: Could we can improve ocfs2_dentry_revalidate() by
+        * tracking these?
+        */
+       if (!inode)
+               return 0;
+
+       if (dl) {
+               mlog_bug_on_msg(dl->dl_parent_blkno != parent_blkno,
+                               " \"%.*s\": old parent: %llu, new: %llu\n",
+                               dentry->d_name.len, dentry->d_name.name,
+                               (unsigned long long)parent_blkno,
+                               (unsigned long long)dl->dl_parent_blkno);
+               return 0;
+       }
+
+       alias = ocfs2_find_local_alias(inode, parent_blkno, 0);
+       if (alias) {
+               /*
+                * Great, an alias exists, which means we must have a
+                * dentry lock already. We can just grab the lock off
+                * the alias and add it to the list.
+                *
+                * We're depending here on the fact that this dentry
+                * was found and exists in the dcache and so must have
+                * a reference to the dentry_lock because we can't
+                * race creates. Final dput() cannot happen on it
+                * since we have it pinned, so our reference is safe.
+                */
+               dl = alias->d_fsdata;
+               mlog_bug_on_msg(!dl, "parent %llu, ino %llu\n",
+                               (unsigned long long)parent_blkno,
+                               (unsigned long long)OCFS2_I(inode)->ip_blkno);
+
+               mlog_bug_on_msg(dl->dl_parent_blkno != parent_blkno,
+                               " \"%.*s\": old parent: %llu, new: %llu\n",
+                               dentry->d_name.len, dentry->d_name.name,
+                               (unsigned long long)parent_blkno,
+                               (unsigned long long)dl->dl_parent_blkno);
+
+               mlog(0, "Found: %s\n", dl->dl_lockres.l_name);
+
+               goto out_attach;
+       }
+
+       /*
+        * There are no other aliases
+        */
+       dl = kmalloc(sizeof(*dl), GFP_NOFS);
+       if (!dl) {
+               ret = -ENOMEM;
+               mlog_errno(ret);
+               return ret;
+       }
+
+       dl->dl_count = 0;
+       /*
+        * Does this have to happen below, for all attaches, in case
+        * the struct inode gets blown away by votes?
+        */
+       dl->dl_inode = igrab(inode);
+       dl->dl_parent_blkno = parent_blkno;
+       ocfs2_dentry_lock_res_init(dl, parent_blkno, inode);
+
+out_attach:
+       spin_lock(&dentry_attach_lock);
+       dentry->d_fsdata = dl;
+       dl->dl_count++;
+       spin_unlock(&dentry_attach_lock);
+
+       /*
+        * This actually gets us our PRMODE level lock. From now on,
+        * we'll have a notification if one of these names is
+        * destroyed on another node.
+        */
+       ret = ocfs2_dentry_lock(dentry, 0);
+       if (!ret)
+               ocfs2_dentry_unlock(dentry, 0);
+       else
+               mlog_errno(ret);
+
+       dput(alias);
+
+       return ret;
+}
+
+/*
+ * ocfs2_dentry_iput() and friends.
+ *
+ * At this point, our particular dentry is detached from the inodes
+ * alias list, so there's no way that the locking code can find it.
+ *
+ * The interesting stuff happens when we determine that our lock needs
+ * to go away because this is the last subdir alias in the
+ * system. This function needs to handle a couple things:
+ *
+ * 1) Synchronizing lock shutdown with the downconvert threads. This
+ *    is already handled for us via the lockres release drop function
+ *    called in ocfs2_release_dentry_lock()
+ *
+ * 2) A race may occur when we're doing our lock shutdown and
+ *    another process wants to create a new dentry lock. Right now we
+ *    let them race, which means that for a very short while, this
+ *    node might have two locks on a lock resource. This should be a
+ *    problem though because one of them is in the process of being
+ *    thrown out.
+ */
+static void ocfs2_drop_dentry_lock(struct ocfs2_super *osb,
+                                  struct ocfs2_dentry_lock *dl)
+{
+       ocfs2_simple_drop_lockres(osb, &dl->dl_lockres);
+       ocfs2_lock_res_free(&dl->dl_lockres);
+       iput(dl->dl_inode);
+       kfree(dl);
+}
+
+void ocfs2_dentry_lock_put(struct ocfs2_super *osb,
+                          struct ocfs2_dentry_lock *dl)
+{
+       int unlock = 0;
+
+       BUG_ON(dl->dl_count == 0);
+
+       spin_lock(&dentry_attach_lock);
+       dl->dl_count--;
+       unlock = !dl->dl_count;
+       spin_unlock(&dentry_attach_lock);
+
+       if (unlock)
+               ocfs2_drop_dentry_lock(osb, dl);
+}
+
+static void ocfs2_dentry_iput(struct dentry *dentry, struct inode *inode)
+{
+       struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
+
+       mlog_bug_on_msg(!dl && !(dentry->d_flags & DCACHE_DISCONNECTED),
+                       "dentry: %.*s\n", dentry->d_name.len,
+                       dentry->d_name.name);
+
+       if (!dl)
+               goto out;
+
+       mlog_bug_on_msg(dl->dl_count == 0, "dentry: %.*s, count: %u\n",
+                       dentry->d_name.len, dentry->d_name.name,
+                       dl->dl_count);
+
+       ocfs2_dentry_lock_put(OCFS2_SB(dentry->d_sb), dl);
+
+out:
+       iput(inode);
+}
+
+/*
+ * d_move(), but keep the locks in sync.
+ *
+ * When we are done, "dentry" will have the parent dir and name of
+ * "target", which will be thrown away.
+ *
+ * We manually update the lock of "dentry" if need be.
+ *
+ * "target" doesn't have it's dentry lock touched - we allow the later
+ * dput() to handle this for us.
+ *
+ * This is called during ocfs2_rename(), while holding parent
+ * directory locks. The dentries have already been deleted on other
+ * nodes via ocfs2_remote_dentry_delete().
+ *
+ * Normally, the VFS handles the d_move() for the file sytem, after
+ * the ->rename() callback. OCFS2 wants to handle this internally, so
+ * the new lock can be created atomically with respect to the cluster.
+ */
+void ocfs2_dentry_move(struct dentry *dentry, struct dentry *target,
+                      struct inode *old_dir, struct inode *new_dir)
+{
+       int ret;
+       struct ocfs2_super *osb = OCFS2_SB(old_dir->i_sb);
+       struct inode *inode = dentry->d_inode;
+
+       /*
+        * Move within the same directory, so the actual lock info won't
+        * change.
+        *
+        * XXX: Is there any advantage to dropping the lock here?
+        */
+       if (old_dir == new_dir)
+               goto out_move;
+
+       ocfs2_dentry_lock_put(osb, dentry->d_fsdata);
+
+       dentry->d_fsdata = NULL;
+       ret = ocfs2_dentry_attach_lock(dentry, inode, OCFS2_I(new_dir)->ip_blkno);
+       if (ret)
+               mlog_errno(ret);
+
+out_move:
+       d_move(dentry, target);
+}
+
 struct dentry_operations ocfs2_dentry_ops = {
        .d_revalidate           = ocfs2_dentry_revalidate,
+       .d_iput                 = ocfs2_dentry_iput,
 };
index 9007277..c091c34 100644 (file)
 
 extern struct dentry_operations ocfs2_dentry_ops;
 
+struct ocfs2_dentry_lock {
+       unsigned int            dl_count;
+       u64                     dl_parent_blkno;
+
+       /*
+        * The ocfs2_dentry_lock keeps an inode reference until
+        * dl_lockres has been destroyed. This is usually done in
+        * ->d_iput() anyway, so there should be minimal impact.
+        */
+       struct inode            *dl_inode;
+       struct ocfs2_lock_res   dl_lockres;
+};
+
+int ocfs2_dentry_attach_lock(struct dentry *dentry, struct inode *inode,
+                            u64 parent_blkno);
+
+void ocfs2_dentry_lock_put(struct ocfs2_super *osb,
+                          struct ocfs2_dentry_lock *dl);
+
+struct dentry *ocfs2_find_local_alias(struct inode *inode, u64 parent_blkno,
+                                     int skip_unhashed);
+
+void ocfs2_dentry_move(struct dentry *dentry, struct dentry *target,
+                      struct inode *old_dir, struct inode *new_dir);
+
+extern spinlock_t dentry_attach_lock;
+
 #endif /* OCFS2_DCACHE_H */
index 53652f5..cfd5cb6 100644 (file)
@@ -182,6 +182,7 @@ enum dlm_status dlmlock(struct dlm_ctxt *dlm,
                        struct dlm_lockstatus *lksb,
                        int flags,
                        const char *name,
+                       int namelen,
                        dlm_astlockfunc_t *ast,
                        void *data,
                        dlm_bastlockfunc_t *bast);
index f13a4ba..681046d 100644 (file)
@@ -320,8 +320,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
 
        res = dlm_lookup_lockres(dlm, name, locklen);
        if (!res) {
-               mlog(ML_ERROR, "got %sast for unknown lockres! "
-                              "cookie=%u:%llu, name=%.*s, namelen=%u\n",
+               mlog(0, "got %sast for unknown lockres! "
+                    "cookie=%u:%llu, name=%.*s, namelen=%u\n",
                     past->type == DLM_AST ? "" : "b",
                     dlm_get_lock_cookie_node(cookie),
                     dlm_get_lock_cookie_seq(cookie),
@@ -462,7 +462,7 @@ int dlm_send_proxy_ast_msg(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
                        mlog(ML_ERROR, "sent AST to node %u, it returned "
                             "DLM_MIGRATING!\n", lock->ml.node);
                        BUG();
-               } else if (status != DLM_NORMAL) {
+               } else if (status != DLM_NORMAL && status != DLM_IVLOCKID) {
                        mlog(ML_ERROR, "AST to node %u returned %d!\n",
                             lock->ml.node, status);
                        /* ignore it */
index 14530ee..fa96818 100644 (file)
@@ -747,6 +747,7 @@ void dlm_change_lockres_owner(struct dlm_ctxt *dlm,
                              u8 owner);
 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
                                                 const char *lockid,
+                                                int namelen,
                                                 int flags);
 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
                                          const char *name,
index 5ca57ec..42a1b91 100644 (file)
@@ -540,8 +540,8 @@ static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie)
 
 enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
                        struct dlm_lockstatus *lksb, int flags,
-                       const char *name, dlm_astlockfunc_t *ast, void *data,
-                       dlm_bastlockfunc_t *bast)
+                       const char *name, int namelen, dlm_astlockfunc_t *ast,
+                       void *data, dlm_bastlockfunc_t *bast)
 {
        enum dlm_status status;
        struct dlm_lock_resource *res = NULL;
@@ -571,7 +571,7 @@ enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
        recovery = (flags & LKM_RECOVERY);
 
        if (recovery &&
-           (!dlm_is_recovery_lock(name, strlen(name)) || convert) ) {
+           (!dlm_is_recovery_lock(name, namelen) || convert) ) {
                dlm_error(status);
                goto error;
        }
@@ -643,7 +643,7 @@ retry_convert:
                }
 
                status = DLM_IVBUFLEN;
-               if (strlen(name) > DLM_LOCKID_NAME_MAX || strlen(name) < 1) {
+               if (namelen > DLM_LOCKID_NAME_MAX || namelen < 1) {
                        dlm_error(status);
                        goto error;
                }
@@ -659,7 +659,7 @@ retry_convert:
                        dlm_wait_for_recovery(dlm);
 
                /* find or create the lock resource */
-               res = dlm_get_lock_resource(dlm, name, flags);
+               res = dlm_get_lock_resource(dlm, name, namelen, flags);
                if (!res) {
                        status = DLM_IVLOCKID;
                        dlm_error(status);
index 9503240..f784177 100644 (file)
@@ -740,6 +740,7 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
  */
 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
                                          const char *lockid,
+                                         int namelen,
                                          int flags)
 {
        struct dlm_lock_resource *tmpres=NULL, *res=NULL;
@@ -748,13 +749,12 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
        int blocked = 0;
        int ret, nodenum;
        struct dlm_node_iter iter;
-       unsigned int namelen, hash;
+       unsigned int hash;
        int tries = 0;
        int bit, wait_on_recovery = 0;
 
        BUG_ON(!lockid);
 
-       namelen = strlen(lockid);
        hash = dlm_lockid_hash(lockid, namelen);
 
        mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
index 594745f..9d950d7 100644 (file)
@@ -2285,7 +2285,8 @@ again:
        memset(&lksb, 0, sizeof(lksb));
 
        ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
-                     DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast);
+                     DLM_RECOVERY_LOCK_NAME, DLM_RECOVERY_LOCK_NAME_LEN,
+                     dlm_reco_ast, dlm, dlm_reco_bast);
 
        mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
             dlm->name, ret, lksb.status);
index e641b08..eead48b 100644 (file)
@@ -102,10 +102,10 @@ static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
        spin_unlock(&lockres->l_lock);
 }
 
-#define user_log_dlm_error(_func, _stat, _lockres) do {                \
-       mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "  \
-               "resource %s: %s\n", dlm_errname(_stat), _func, \
-               _lockres->l_name, dlm_errmsg(_stat));           \
+#define user_log_dlm_error(_func, _stat, _lockres) do {                        \
+       mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "          \
+               "resource %.*s: %s\n", dlm_errname(_stat), _func,       \
+               _lockres->l_namelen, _lockres->l_name, dlm_errmsg(_stat)); \
 } while (0)
 
 /* WARNING: This function lives in a world where the only three lock
@@ -127,21 +127,22 @@ static void user_ast(void *opaque)
        struct user_lock_res *lockres = opaque;
        struct dlm_lockstatus *lksb;
 
-       mlog(0, "AST fired for lockres %s\n", lockres->l_name);
+       mlog(0, "AST fired for lockres %.*s\n", lockres->l_namelen,
+            lockres->l_name);
 
        spin_lock(&lockres->l_lock);
 
        lksb = &(lockres->l_lksb);
        if (lksb->status != DLM_NORMAL) {
-               mlog(ML_ERROR, "lksb status value of %u on lockres %s\n",
-                    lksb->status, lockres->l_name);
+               mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n",
+                    lksb->status, lockres->l_namelen, lockres->l_name);
                spin_unlock(&lockres->l_lock);
                return;
        }
 
        mlog_bug_on_msg(lockres->l_requested == LKM_IVMODE,
-                       "Lockres %s, requested ivmode. flags 0x%x\n",
-                       lockres->l_name, lockres->l_flags);
+                       "Lockres %.*s, requested ivmode. flags 0x%x\n",
+                       lockres->l_namelen, lockres->l_name, lockres->l_flags);
 
        /* we're downconverting. */
        if (lockres->l_requested < lockres->l_level) {
@@ -213,8 +214,8 @@ static void user_bast(void *opaque, int level)
 {
        struct user_lock_res *lockres = opaque;
 
-       mlog(0, "Blocking AST fired for lockres %s. Blocking level %d\n",
-               lockres->l_name, level);
+       mlog(0, "Blocking AST fired for lockres %.*s. Blocking level %d\n",
+            lockres->l_namelen, lockres->l_name, level);
 
        spin_lock(&lockres->l_lock);
        lockres->l_flags |= USER_LOCK_BLOCKED;
@@ -231,7 +232,8 @@ static void user_unlock_ast(void *opaque, enum dlm_status status)
 {
        struct user_lock_res *lockres = opaque;
 
-       mlog(0, "UNLOCK AST called on lock %s\n", lockres->l_name);
+       mlog(0, "UNLOCK AST called on lock %.*s\n", lockres->l_namelen,
+            lockres->l_name);
 
        if (status != DLM_NORMAL && status != DLM_CANCELGRANT)
                mlog(ML_ERROR, "Dlm returns status %d\n", status);
@@ -244,8 +246,6 @@ static void user_unlock_ast(void *opaque, enum dlm_status status)
            && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) {
                lockres->l_level = LKM_IVMODE;
        } else if (status == DLM_CANCELGRANT) {
-               mlog(0, "Lock %s, cancel fails, flags 0x%x\n",
-                    lockres->l_name, lockres->l_flags);
                /* We tried to cancel a convert request, but it was
                 * already granted. Don't clear the busy flag - the
                 * ast should've done this already. */
@@ -255,8 +255,6 @@ static void user_unlock_ast(void *opaque, enum dlm_status status)
        } else {
                BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
                /* Cancel succeeded, we want to re-queue */
-               mlog(0, "Lock %s, cancel succeeds, flags 0x%x\n",
-                    lockres->l_name, lockres->l_flags);
                lockres->l_requested = LKM_IVMODE; /* cancel an
                                                    * upconvert
                                                    * request. */
@@ -287,13 +285,14 @@ static void user_dlm_unblock_lock(void *opaque)
        struct user_lock_res *lockres = (struct user_lock_res *) opaque;
        struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
 
-       mlog(0, "processing lockres %s\n", lockres->l_name);
+       mlog(0, "processing lockres %.*s\n", lockres->l_namelen,
+            lockres->l_name);
 
        spin_lock(&lockres->l_lock);
 
        mlog_bug_on_msg(!(lockres->l_flags & USER_LOCK_QUEUED),
-                       "Lockres %s, flags 0x%x\n",
-                       lockres->l_name, lockres->l_flags);
+                       "Lockres %.*s, flags 0x%x\n",
+                       lockres->l_namelen, lockres->l_name, lockres->l_flags);
 
        /* notice that we don't clear USER_LOCK_BLOCKED here. If it's
         * set, we want user_ast clear it. */
@@ -305,22 +304,16 @@ static void user_dlm_unblock_lock(void *opaque)
         * flag, and finally we might get another bast which re-queues
         * us before our ast for the downconvert is called. */
        if (!(lockres->l_flags & USER_LOCK_BLOCKED)) {
-               mlog(0, "Lockres %s, flags 0x%x: queued but not blocking\n",
-                       lockres->l_name, lockres->l_flags);
                spin_unlock(&lockres->l_lock);
                goto drop_ref;
        }
 
        if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
-               mlog(0, "lock is in teardown so we do nothing\n");
                spin_unlock(&lockres->l_lock);
                goto drop_ref;
        }
 
        if (lockres->l_flags & USER_LOCK_BUSY) {
-               mlog(0, "Cancel lock %s, flags 0x%x\n",
-                    lockres->l_name, lockres->l_flags);
-
                if (lockres->l_flags & USER_LOCK_IN_CANCEL) {
                        spin_unlock(&lockres->l_lock);
                        goto drop_ref;
@@ -372,6 +365,7 @@ static void user_dlm_unblock_lock(void *opaque)
                         &lockres->l_lksb,
                         LKM_CONVERT|LKM_VALBLK,
                         lockres->l_name,
+                        lockres->l_namelen,
                         user_ast,
                         lockres,
                         user_bast);
@@ -420,16 +414,16 @@ int user_dlm_cluster_lock(struct user_lock_res *lockres,
 
        if (level != LKM_EXMODE &&
            level != LKM_PRMODE) {
-               mlog(ML_ERROR, "lockres %s: invalid request!\n",
-                    lockres->l_name);
+               mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
+                    lockres->l_namelen, lockres->l_name);
                status = -EINVAL;
                goto bail;
        }
 
-       mlog(0, "lockres %s: asking for %s lock, passed flags = 0x%x\n",
-               lockres->l_name,
-               (level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE",
-               lkm_flags);
+       mlog(0, "lockres %.*s: asking for %s lock, passed flags = 0x%x\n",
+            lockres->l_namelen, lockres->l_name,
+            (level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE",
+            lkm_flags);
 
 again:
        if (signal_pending(current)) {
@@ -474,15 +468,13 @@ again:
                BUG_ON(level == LKM_IVMODE);
                BUG_ON(level == LKM_NLMODE);
 
-               mlog(0, "lock %s, get lock from %d to level = %d\n",
-                       lockres->l_name, lockres->l_level, level);
-
                /* call dlm_lock to upgrade lock now */
                status = dlmlock(dlm,
                                 level,
                                 &lockres->l_lksb,
                                 local_flags,
                                 lockres->l_name,
+                                lockres->l_namelen,
                                 user_ast,
                                 lockres,
                                 user_bast);
@@ -498,9 +490,6 @@ again:
                        goto bail;
                }
 
-               mlog(0, "lock %s, successfull return from dlmlock\n",
-                       lockres->l_name);
-
                user_wait_on_busy_lock(lockres);
                goto again;
        }
@@ -508,9 +497,6 @@ again:
        user_dlm_inc_holders(lockres, level);
        spin_unlock(&lockres->l_lock);
 
-       mlog(0, "lockres %s: Got %s lock!\n", lockres->l_name,
-               (level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE");
-
        status = 0;
 bail:
        return status;
@@ -538,13 +524,11 @@ void user_dlm_cluster_unlock(struct user_lock_res *lockres,
 {
        if (level != LKM_EXMODE &&
            level != LKM_PRMODE) {
-               mlog(ML_ERROR, "lockres %s: invalid request!\n", lockres->l_name);
+               mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
+                    lockres->l_namelen, lockres->l_name);
                return;
        }
 
-       mlog(0, "lockres %s: dropping %s lock\n", lockres->l_name,
-               (level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE");
-
        spin_lock(&lockres->l_lock);
        user_dlm_dec_holders(lockres, level);
        __user_dlm_cond_queue_lockres(lockres);
@@ -602,6 +586,7 @@ void user_dlm_lock_res_init(struct user_lock_res *lockres,
        memcpy(lockres->l_name,
               dentry->d_name.name,
               dentry->d_name.len);
+       lockres->l_namelen = dentry->d_name.len;
 }
 
 int user_dlm_destroy_lock(struct user_lock_res *lockres)
@@ -609,11 +594,10 @@ int user_dlm_destroy_lock(struct user_lock_res *lockres)
        int status = -EBUSY;
        struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
 
-       mlog(0, "asked to destroy %s\n", lockres->l_name);
+       mlog(0, "asked to destroy %.*s\n", lockres->l_namelen, lockres->l_name);
 
        spin_lock(&lockres->l_lock);
        if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
-               mlog(0, "Lock is already torn down\n");
                spin_unlock(&lockres->l_lock);
                return 0;
        }
@@ -623,8 +607,6 @@ int user_dlm_destroy_lock(struct user_lock_res *lockres)
        while (lockres->l_flags & USER_LOCK_BUSY) {
                spin_unlock(&lockres->l_lock);
 
-               mlog(0, "lock %s is busy\n", lockres->l_name);
-
                user_wait_on_busy_lock(lockres);
 
                spin_lock(&lockres->l_lock);
@@ -632,14 +614,12 @@ int user_dlm_destroy_lock(struct user_lock_res *lockres)
 
        if (lockres->l_ro_holders || lockres->l_ex_holders) {
                spin_unlock(&lockres->l_lock);
-               mlog(0, "lock %s has holders\n", lockres->l_name);
                goto bail;
        }
 
        status = 0;
        if (!(lockres->l_flags & USER_LOCK_ATTACHED)) {
                spin_unlock(&lockres->l_lock);
-               mlog(0, "lock %s is not attached\n", lockres->l_name);
                goto bail;
        }
 
@@ -647,7 +627,6 @@ int user_dlm_destroy_lock(struct user_lock_res *lockres)
        lockres->l_flags |= USER_LOCK_BUSY;
        spin_unlock(&lockres->l_lock);
 
-       mlog(0, "unlocking lockres %s\n", lockres->l_name);
        status = dlmunlock(dlm,
                           &lockres->l_lksb,
                           LKM_VALBLK,
index 04178bc..c400e93 100644 (file)
@@ -53,6 +53,7 @@ struct user_lock_res {
 
 #define USER_DLM_LOCK_ID_MAX_LEN  32
        char                     l_name[USER_DLM_LOCK_ID_MAX_LEN];
+       int                      l_namelen;
        int                      l_level;
        unsigned int             l_ro_holders;
        unsigned int             l_ex_holders;
index 151b417..de88706 100644 (file)
@@ -46,6 +46,7 @@
 #include "ocfs2.h"
 
 #include "alloc.h"
+#include "dcache.h"
 #include "dlmglue.h"
 #include "extent_map.h"
 #include "heartbeat.h"
@@ -66,78 +67,161 @@ struct ocfs2_mask_waiter {
        unsigned long           mw_goal;
 };
 
-static void ocfs2_inode_ast_func(void *opaque);
-static void ocfs2_inode_bast_func(void *opaque,
-                                 int level);
-static void ocfs2_super_ast_func(void *opaque);
-static void ocfs2_super_bast_func(void *opaque,
-                                 int level);
-static void ocfs2_rename_ast_func(void *opaque);
-static void ocfs2_rename_bast_func(void *opaque,
-                                  int level);
-
-/* so far, all locks have gotten along with the same unlock ast */
-static void ocfs2_unlock_ast_func(void *opaque,
-                                 enum dlm_status status);
-static int ocfs2_do_unblock_meta(struct inode *inode,
-                                int *requeue);
-static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
-                             int *requeue);
-static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
-                             int *requeue);
-static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
-                             int *requeue);
-static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
-                                 int *requeue);
-typedef void (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
-static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
-                                     struct ocfs2_lock_res *lockres,
-                                     int *requeue,
-                                     ocfs2_convert_worker_t *worker);
+static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
+static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
 
+/*
+ * Return value from ->downconvert_worker functions.
+ *
+ * These control the precise actions of ocfs2_unblock_lock()
+ * and ocfs2_process_blocked_lock()
+ *
+ */
+enum ocfs2_unblock_action {
+       UNBLOCK_CONTINUE        = 0, /* Continue downconvert */
+       UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
+                                     * ->post_unlock callback */
+       UNBLOCK_STOP_POST       = 2, /* Do not downconvert, fire
+                                     * ->post_unlock() callback. */
+};
+
+struct ocfs2_unblock_ctl {
+       int requeue;
+       enum ocfs2_unblock_action unblock_action;
+};
+
+static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
+                                       int new_level);
+static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
+
+static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
+                                    int blocking);
+
+static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
+                                      int blocking);
+
+static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
+                                    struct ocfs2_lock_res *lockres);
+
+/*
+ * OCFS2 Lock Resource Operations
+ *
+ * These fine tune the behavior of the generic dlmglue locking infrastructure.
+ *
+ * The most basic of lock types can point ->l_priv to their respective
+ * struct ocfs2_super and allow the default actions to manage things.
+ *
+ * Right now, each lock type also needs to implement an init function,
+ * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
+ * should be called when the lock is no longer needed (i.e., object
+ * destruction time).
+ */
 struct ocfs2_lock_res_ops {
-       void (*ast)(void *);
-       void (*bast)(void *, int);
-       void (*unlock_ast)(void *, enum dlm_status);
-       int  (*unblock)(struct ocfs2_lock_res *, int *);
+       /*
+        * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
+        * this callback if ->l_priv is not an ocfs2_super pointer
+        */
+       struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
+
+       /*
+        * Optionally called in the downconvert (or "vote") thread
+        * after a successful downconvert. The lockres will not be
+        * referenced after this callback is called, so it is safe to
+        * free memory, etc.
+        *
+        * The exact semantics of when this is called are controlled
+        * by ->downconvert_worker()
+        */
+       void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
+
+       /*
+        * Allow a lock type to add checks to determine whether it is
+        * safe to downconvert a lock. Return 0 to re-queue the
+        * downconvert at a later time, nonzero to continue.
+        *
+        * For most locks, the default checks that there are no
+        * incompatible holders are sufficient.
+        *
+        * Called with the lockres spinlock held.
+        */
+       int (*check_downconvert)(struct ocfs2_lock_res *, int);
+
+       /*
+        * Allows a lock type to populate the lock value block. This
+        * is called on downconvert, and when we drop a lock.
+        *
+        * Locks that want to use this should set LOCK_TYPE_USES_LVB
+        * in the flags field.
+        *
+        * Called with the lockres spinlock held.
+        */
+       void (*set_lvb)(struct ocfs2_lock_res *);
+
+       /*
+        * Called from the downconvert thread when it is determined
+        * that a lock will be downconverted. This is called without
+        * any locks held so the function can do work that might
+        * schedule (syncing out data, etc).
+        *
+        * This should return any one of the ocfs2_unblock_action
+        * values, depending on what it wants the thread to do.
+        */
+       int (*downconvert_worker)(struct ocfs2_lock_res *, int);
+
+       /*
+        * LOCK_TYPE_* flags which describe the specific requirements
+        * of a lock type. Descriptions of each individual flag follow.
+        */
+       int flags;
 };
 
+/*
+ * Some locks want to "refresh" potentially stale data when a
+ * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
+ * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
+ * individual lockres l_flags member from the ast function. It is
+ * expected that the locking wrapper will clear the
+ * OCFS2_LOCK_NEEDS_REFRESH flag when done.
+ */
+#define LOCK_TYPE_REQUIRES_REFRESH 0x1
+
+/*
+ * Indicate that a lock type makes use of the lock value block. The
+ * ->set_lvb lock type callback must be defined.
+ */
+#define LOCK_TYPE_USES_LVB             0x2
+
 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
-       .ast            = ocfs2_inode_ast_func,
-       .bast           = ocfs2_inode_bast_func,
-       .unlock_ast     = ocfs2_unlock_ast_func,
-       .unblock        = ocfs2_unblock_inode_lock,
+       .get_osb        = ocfs2_get_inode_osb,
+       .flags          = 0,
 };
 
 static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
-       .ast            = ocfs2_inode_ast_func,
-       .bast           = ocfs2_inode_bast_func,
-       .unlock_ast     = ocfs2_unlock_ast_func,
-       .unblock        = ocfs2_unblock_meta,
+       .get_osb        = ocfs2_get_inode_osb,
+       .check_downconvert = ocfs2_check_meta_downconvert,
+       .set_lvb        = ocfs2_set_meta_lvb,
+       .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
 };
 
-static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
-                                     int blocking);
-
 static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
-       .ast            = ocfs2_inode_ast_func,
-       .bast           = ocfs2_inode_bast_func,
-       .unlock_ast     = ocfs2_unlock_ast_func,
-       .unblock        = ocfs2_unblock_data,
+       .get_osb        = ocfs2_get_inode_osb,
+       .downconvert_worker = ocfs2_data_convert_worker,
+       .flags          = 0,
 };
 
 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
-       .ast            = ocfs2_super_ast_func,
-       .bast           = ocfs2_super_bast_func,
-       .unlock_ast     = ocfs2_unlock_ast_func,
-       .unblock        = ocfs2_unblock_osb_lock,
+       .flags          = LOCK_TYPE_REQUIRES_REFRESH,
 };
 
 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
-       .ast            = ocfs2_rename_ast_func,
-       .bast           = ocfs2_rename_bast_func,
-       .unlock_ast     = ocfs2_unlock_ast_func,
-       .unblock        = ocfs2_unblock_osb_lock,
+       .flags          = 0,
+};
+
+static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
+       .get_osb        = ocfs2_get_dentry_osb,
+       .post_unlock    = ocfs2_dentry_post_unlock,
+       .downconvert_worker = ocfs2_dentry_convert_worker,
+       .flags          = 0,
 };
 
 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
@@ -147,29 +231,26 @@ static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
                lockres->l_type == OCFS2_LOCK_TYPE_RW;
 }
 
-static inline int ocfs2_is_super_lock(struct ocfs2_lock_res *lockres)
+static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
 {
-       return lockres->l_type == OCFS2_LOCK_TYPE_SUPER;
-}
+       BUG_ON(!ocfs2_is_inode_lock(lockres));
 
-static inline int ocfs2_is_rename_lock(struct ocfs2_lock_res *lockres)
-{
-       return lockres->l_type == OCFS2_LOCK_TYPE_RENAME;
+       return (struct inode *) lockres->l_priv;
 }
 
-static inline struct ocfs2_super *ocfs2_lock_res_super(struct ocfs2_lock_res *lockres)
+static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
 {
-       BUG_ON(!ocfs2_is_super_lock(lockres)
-              && !ocfs2_is_rename_lock(lockres));
+       BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
 
-       return (struct ocfs2_super *) lockres->l_priv;
+       return (struct ocfs2_dentry_lock *)lockres->l_priv;
 }
 
-static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
+static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
 {
-       BUG_ON(!ocfs2_is_inode_lock(lockres));
+       if (lockres->l_ops->get_osb)
+               return lockres->l_ops->get_osb(lockres);
 
-       return (struct inode *) lockres->l_priv;
+       return (struct ocfs2_super *)lockres->l_priv;
 }
 
 static int ocfs2_lock_create(struct ocfs2_super *osb,
@@ -200,25 +281,6 @@ static int ocfs2_meta_lock_update(struct inode *inode,
                                  struct buffer_head **bh);
 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
 static inline int ocfs2_highest_compat_lock_level(int level);
-static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
-                                                 struct ocfs2_lock_res *lockres,
-                                                 int new_level);
-
-static char *ocfs2_lock_type_strings[] = {
-       [OCFS2_LOCK_TYPE_META] = "Meta",
-       [OCFS2_LOCK_TYPE_DATA] = "Data",
-       [OCFS2_LOCK_TYPE_SUPER] = "Super",
-       [OCFS2_LOCK_TYPE_RENAME] = "Rename",
-       /* Need to differntiate from [R]ename.. serializing writes is the
-        * important job it does, anyway. */
-       [OCFS2_LOCK_TYPE_RW] = "Write/Read",
-};
-
-static char *ocfs2_lock_type_string(enum ocfs2_lock_type type)
-{
-       mlog_bug_on_msg(type >= OCFS2_NUM_LOCK_TYPES, "%d\n", type);
-       return ocfs2_lock_type_strings[type];
-}
 
 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
                                  u64 blkno,
@@ -265,13 +327,9 @@ static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
                                       struct ocfs2_lock_res *res,
                                       enum ocfs2_lock_type type,
-                                      u64 blkno,
-                                      u32 generation,
                                       struct ocfs2_lock_res_ops *ops,
                                       void *priv)
 {
-       ocfs2_build_lock_name(type, blkno, generation, res->l_name);
-
        res->l_type          = type;
        res->l_ops           = ops;
        res->l_priv          = priv;
@@ -299,6 +357,7 @@ void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
 
 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
                               enum ocfs2_lock_type type,
+                              unsigned int generation,
                               struct inode *inode)
 {
        struct ocfs2_lock_res_ops *ops;
@@ -319,9 +378,73 @@ void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
                        break;
        };
 
-       ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type,
-                                  OCFS2_I(inode)->ip_blkno,
-                                  inode->i_generation, ops, inode);
+       ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
+                             generation, res->l_name);
+       ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
+}
+
+static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
+{
+       struct inode *inode = ocfs2_lock_res_inode(lockres);
+
+       return OCFS2_SB(inode->i_sb);
+}
+
+static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
+{
+       __be64 inode_blkno_be;
+
+       memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
+              sizeof(__be64));
+
+       return be64_to_cpu(inode_blkno_be);
+}
+
+static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
+{
+       struct ocfs2_dentry_lock *dl = lockres->l_priv;
+
+       return OCFS2_SB(dl->dl_inode->i_sb);
+}
+
+void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
+                               u64 parent, struct inode *inode)
+{
+       int len;
+       u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
+       __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
+       struct ocfs2_lock_res *lockres = &dl->dl_lockres;
+
+       ocfs2_lock_res_init_once(lockres);
+
+       /*
+        * Unfortunately, the standard lock naming scheme won't work
+        * here because we have two 16 byte values to use. Instead,
+        * we'll stuff the inode number as a binary value. We still
+        * want error prints to show something without garbling the
+        * display, so drop a null byte in there before the inode
+        * number. A future version of OCFS2 will likely use all
+        * binary lock names. The stringified names have been a
+        * tremendous aid in debugging, but now that the debugfs
+        * interface exists, we can mangle things there if need be.
+        *
+        * NOTE: We also drop the standard "pad" value (the total lock
+        * name size stays the same though - the last part is all
+        * zeros due to the memset in ocfs2_lock_res_init_once()
+        */
+       len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
+                      "%c%016llx",
+                      ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
+                      (long long)parent);
+
+       BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
+
+       memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
+              sizeof(__be64));
+
+       ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
+                                  OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
+                                  dl);
 }
 
 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
@@ -330,8 +453,9 @@ static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
        /* Superblock lockres doesn't come from a slab so we call init
         * once on it manually.  */
        ocfs2_lock_res_init_once(res);
+       ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
+                             0, res->l_name);
        ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
-                                  OCFS2_SUPER_BLOCK_BLKNO, 0,
                                   &ocfs2_super_lops, osb);
 }
 
@@ -341,7 +465,8 @@ static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
        /* Rename lockres doesn't come from a slab so we call init
         * once on it manually.  */
        ocfs2_lock_res_init_once(res);
-       ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 0, 0,
+       ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
+       ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
                                   &ocfs2_rename_lops, osb);
 }
 
@@ -495,7 +620,8 @@ static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lo
         * information is already up to data. Convert from NL to
         * *anything* however should mark ourselves as needing an
         * update */
-       if (lockres->l_level == LKM_NLMODE)
+       if (lockres->l_level == LKM_NLMODE &&
+           lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
                lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 
        lockres->l_level = lockres->l_requested;
@@ -512,7 +638,8 @@ static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *loc
        BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
 
        if (lockres->l_requested > LKM_NLMODE &&
-           !(lockres->l_flags & OCFS2_LOCK_LOCAL))
+           !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
+           lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
                lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 
        lockres->l_level = lockres->l_requested;
@@ -522,68 +649,6 @@ static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *loc
        mlog_exit_void();
 }
 
-static void ocfs2_inode_ast_func(void *opaque)
-{
-       struct ocfs2_lock_res *lockres = opaque;
-       struct inode *inode;
-       struct dlm_lockstatus *lksb;
-       unsigned long flags;
-
-       mlog_entry_void();
-
-       inode = ocfs2_lock_res_inode(lockres);
-
-       mlog(0, "AST fired for inode %llu, l_action = %u, type = %s\n",
-            (unsigned long long)OCFS2_I(inode)->ip_blkno, lockres->l_action,
-            ocfs2_lock_type_string(lockres->l_type));
-
-       BUG_ON(!ocfs2_is_inode_lock(lockres));
-
-       spin_lock_irqsave(&lockres->l_lock, flags);
-
-       lksb = &(lockres->l_lksb);
-       if (lksb->status != DLM_NORMAL) {
-               mlog(ML_ERROR, "ocfs2_inode_ast_func: lksb status value of %u "
-                    "on inode %llu\n", lksb->status,
-                    (unsigned long long)OCFS2_I(inode)->ip_blkno);
-               spin_unlock_irqrestore(&lockres->l_lock, flags);
-               mlog_exit_void();
-               return;
-       }
-
-       switch(lockres->l_action) {
-       case OCFS2_AST_ATTACH:
-               ocfs2_generic_handle_attach_action(lockres);
-               lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
-               break;
-       case OCFS2_AST_CONVERT:
-               ocfs2_generic_handle_convert_action(lockres);
-               break;
-       case OCFS2_AST_DOWNCONVERT:
-               ocfs2_generic_handle_downconvert_action(lockres);
-               break;
-       default:
-               mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
-                    "lockres flags = 0x%lx, unlock action: %u\n",
-                    lockres->l_name, lockres->l_action, lockres->l_flags,
-                    lockres->l_unlock_action);
-
-               BUG();
-       }
-
-       /* data and rw locking ignores refresh flag for now. */
-       if (lockres->l_type != OCFS2_LOCK_TYPE_META)
-               lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
-
-       /* set it to something invalid so if we get called again we
-        * can catch it. */
-       lockres->l_action = OCFS2_AST_INVALID;
-       spin_unlock_irqrestore(&lockres->l_lock, flags);
-       wake_up(&lockres->l_event);
-
-       mlog_exit_void();
-}
-
 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
                                     int level)
 {
@@ -610,54 +675,33 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
        return needs_downconvert;
 }
 
-static void ocfs2_generic_bast_func(struct ocfs2_super *osb,
-                                   struct ocfs2_lock_res *lockres,
-                                   int level)
+static void ocfs2_blocking_ast(void *opaque, int level)
 {
+       struct ocfs2_lock_res *lockres = opaque;
+       struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
        int needs_downconvert;
        unsigned long flags;
 
-       mlog_entry_void();
-
        BUG_ON(level <= LKM_NLMODE);
 
+       mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
+            lockres->l_name, level, lockres->l_level,
+            ocfs2_lock_type_string(lockres->l_type));
+
        spin_lock_irqsave(&lockres->l_lock, flags);
        needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
        if (needs_downconvert)
                ocfs2_schedule_blocked_lock(osb, lockres);
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-       ocfs2_kick_vote_thread(osb);
-
        wake_up(&lockres->l_event);
-       mlog_exit_void();
-}
-
-static void ocfs2_inode_bast_func(void *opaque, int level)
-{
-       struct ocfs2_lock_res *lockres = opaque;
-       struct inode *inode;
-       struct ocfs2_super *osb;
 
-       mlog_entry_void();
-
-       BUG_ON(!ocfs2_is_inode_lock(lockres));
-
-       inode = ocfs2_lock_res_inode(lockres);
-       osb = OCFS2_SB(inode->i_sb);
-
-       mlog(0, "BAST fired for inode %llu, blocking %d, level %d type %s\n",
-            (unsigned long long)OCFS2_I(inode)->ip_blkno, level,
-            lockres->l_level, ocfs2_lock_type_string(lockres->l_type));
-
-       ocfs2_generic_bast_func(osb, lockres, level);
-
-       mlog_exit_void();
+       ocfs2_kick_vote_thread(osb);
 }
 
-static void ocfs2_generic_ast_func(struct ocfs2_lock_res *lockres,
-                                  int ignore_refresh)
+static void ocfs2_locking_ast(void *opaque)
 {
+       struct ocfs2_lock_res *lockres = opaque;
        struct dlm_lockstatus *lksb = &lockres->l_lksb;
        unsigned long flags;
 
@@ -673,6 +717,7 @@ static void ocfs2_generic_ast_func(struct ocfs2_lock_res *lockres,
        switch(lockres->l_action) {
        case OCFS2_AST_ATTACH:
                ocfs2_generic_handle_attach_action(lockres);
+               lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
                break;
        case OCFS2_AST_CONVERT:
                ocfs2_generic_handle_convert_action(lockres);
@@ -681,80 +726,19 @@ static void ocfs2_generic_ast_func(struct ocfs2_lock_res *lockres,
                ocfs2_generic_handle_downconvert_action(lockres);
                break;
        default:
+               mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
+                    "lockres flags = 0x%lx, unlock action: %u\n",
+                    lockres->l_name, lockres->l_action, lockres->l_flags,
+                    lockres->l_unlock_action);
                BUG();
        }
 
-       if (ignore_refresh)
-               lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
-
        /* set it to something invalid so if we get called again we
         * can catch it. */
        lockres->l_action = OCFS2_AST_INVALID;
-       spin_unlock_irqrestore(&lockres->l_lock, flags);
 
        wake_up(&lockres->l_event);
-}
-
-static void ocfs2_super_ast_func(void *opaque)
-{
-       struct ocfs2_lock_res *lockres = opaque;
-
-       mlog_entry_void();
-       mlog(0, "Superblock AST fired\n");
-
-       BUG_ON(!ocfs2_is_super_lock(lockres));
-       ocfs2_generic_ast_func(lockres, 0);
-
-       mlog_exit_void();
-}
-
-static void ocfs2_super_bast_func(void *opaque,
-                                 int level)
-{
-       struct ocfs2_lock_res *lockres = opaque;
-       struct ocfs2_super *osb;
-
-       mlog_entry_void();
-       mlog(0, "Superblock BAST fired\n");
-
-       BUG_ON(!ocfs2_is_super_lock(lockres));
-               osb = ocfs2_lock_res_super(lockres);
-       ocfs2_generic_bast_func(osb, lockres, level);
-
-       mlog_exit_void();
-}
-
-static void ocfs2_rename_ast_func(void *opaque)
-{
-       struct ocfs2_lock_res *lockres = opaque;
-
-       mlog_entry_void();
-
-       mlog(0, "Rename AST fired\n");
-
-       BUG_ON(!ocfs2_is_rename_lock(lockres));
-
-       ocfs2_generic_ast_func(lockres, 1);
-
-       mlog_exit_void();
-}
-
-static void ocfs2_rename_bast_func(void *opaque,
-                                  int level)
-{
-       struct ocfs2_lock_res *lockres = opaque;
-       struct ocfs2_super *osb;
-
-       mlog_entry_void();
-
-       mlog(0, "Rename BAST fired\n");
-
-       BUG_ON(!ocfs2_is_rename_lock(lockres));
-
-       osb = ocfs2_lock_res_super(lockres);
-       ocfs2_generic_bast_func(osb, lockres, level);
-
-       mlog_exit_void();
+       spin_unlock_irqrestore(&lockres->l_lock, flags);
 }
 
 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
@@ -810,9 +794,10 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
                         &lockres->l_lksb,
                         dlm_flags,
                         lockres->l_name,
-                        lockres->l_ops->ast,
+                        OCFS2_LOCK_ID_MAX_LEN - 1,
+                        ocfs2_locking_ast,
                         lockres,
-                        lockres->l_ops->bast);
+                        ocfs2_blocking_ast);
        if (status != DLM_NORMAL) {
                ocfs2_log_dlm_error("dlmlock", status, lockres);
                ret = -EINVAL;
@@ -930,6 +915,9 @@ static int ocfs2_cluster_lock(struct ocfs2_super *osb,
 
        ocfs2_init_mask_waiter(&mw);
 
+       if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
+               lkm_flags |= LKM_VALBLK;
+
 again:
        wait = 0;
 
@@ -997,11 +985,12 @@ again:
                status = dlmlock(osb->dlm,
                                 level,
                                 &lockres->l_lksb,
-                                lkm_flags|LKM_CONVERT|LKM_VALBLK,
+                                lkm_flags|LKM_CONVERT,
                                 lockres->l_name,
-                                lockres->l_ops->ast,
+                                OCFS2_LOCK_ID_MAX_LEN - 1,
+                                ocfs2_locking_ast,
                                 lockres,
-                                lockres->l_ops->bast);
+                                ocfs2_blocking_ast);
                if (status != DLM_NORMAL) {
                        if ((lkm_flags & LKM_NOQUEUE) &&
                            (status == DLM_NOTQUEUED))
@@ -1074,18 +1063,21 @@ static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
        mlog_exit_void();
 }
 
-static int ocfs2_create_new_inode_lock(struct inode *inode,
-                                      struct ocfs2_lock_res *lockres)
+int ocfs2_create_new_lock(struct ocfs2_super *osb,
+                         struct ocfs2_lock_res *lockres,
+                         int ex,
+                         int local)
 {
-       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+       int level =  ex ? LKM_EXMODE : LKM_PRMODE;
        unsigned long flags;
+       int lkm_flags = local ? LKM_LOCAL : 0;
 
        spin_lock_irqsave(&lockres->l_lock, flags);
        BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
        lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-       return ocfs2_lock_create(osb, lockres, LKM_EXMODE, LKM_LOCAL);
+       return ocfs2_lock_create(osb, lockres, level, lkm_flags);
 }
 
 /* Grants us an EX lock on the data and metadata resources, skipping
@@ -1097,6 +1089,7 @@ static int ocfs2_create_new_inode_lock(struct inode *inode,
 int ocfs2_create_new_inode_locks(struct inode *inode)
 {
        int ret;
+       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
        BUG_ON(!inode);
        BUG_ON(!ocfs2_inode_is_new(inode));
@@ -1113,22 +1106,23 @@ int ocfs2_create_new_inode_locks(struct inode *inode)
         * on a resource which has an invalid one -- we'll set it
         * valid when we release the EX. */
 
-       ret = ocfs2_create_new_inode_lock(inode,
-                                         &OCFS2_I(inode)->ip_rw_lockres);
+       ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
        if (ret) {
                mlog_errno(ret);
                goto bail;
        }
 
-       ret = ocfs2_create_new_inode_lock(inode,
-                                         &OCFS2_I(inode)->ip_meta_lockres);
+       /*
+        * We don't want to use LKM_LOCAL on a meta data lock as they
+        * don't use a generation in their lock names.
+        */
+       ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
        if (ret) {
                mlog_errno(ret);
                goto bail;
        }
 
-       ret = ocfs2_create_new_inode_lock(inode,
-                                         &OCFS2_I(inode)->ip_data_lockres);
+       ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
        if (ret) {
                mlog_errno(ret);
                goto bail;
@@ -1317,7 +1311,17 @@ static void __ocfs2_stuff_meta_lvb(struct inode *inode)
 
        lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
 
-       lvb->lvb_version   = cpu_to_be32(OCFS2_LVB_VERSION);
+       /*
+        * Invalidate the LVB of a deleted inode - this way other
+        * nodes are forced to go to disk and discover the new inode
+        * status.
+        */
+       if (oi->ip_flags & OCFS2_INODE_DELETED) {
+               lvb->lvb_version = 0;
+               goto out;
+       }
+
+       lvb->lvb_version   = OCFS2_LVB_VERSION;
        lvb->lvb_isize     = cpu_to_be64(i_size_read(inode));
        lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
        lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
@@ -1331,7 +1335,9 @@ static void __ocfs2_stuff_meta_lvb(struct inode *inode)
        lvb->lvb_imtime_packed =
                cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
        lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
+       lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
 
+out:
        mlog_meta_lvb(0, lockres);
 
        mlog_exit_void();
@@ -1386,11 +1392,13 @@ static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
        mlog_exit_void();
 }
 
-static inline int ocfs2_meta_lvb_is_trustable(struct ocfs2_lock_res *lockres)
+static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
+                                             struct ocfs2_lock_res *lockres)
 {
        struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
 
-       if (be32_to_cpu(lvb->lvb_version) == OCFS2_LVB_VERSION)
+       if (lvb->lvb_version == OCFS2_LVB_VERSION
+           && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
                return 1;
        return 0;
 }
@@ -1487,7 +1495,7 @@ static int ocfs2_meta_lock_update(struct inode *inode,
         * map (directories, bitmap files, etc) */
        ocfs2_extent_map_trunc(inode, 0);
 
-       if (ocfs2_meta_lvb_is_trustable(lockres)) {
+       if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
                mlog(0, "Trusting LVB on inode %llu\n",
                     (unsigned long long)oi->ip_blkno);
                ocfs2_refresh_inode_from_lvb(inode);
@@ -1628,6 +1636,18 @@ int ocfs2_meta_lock_full(struct inode *inode,
                wait_event(osb->recovery_event,
                           ocfs2_node_map_is_empty(osb, &osb->recovery_map));
 
+       /*
+        * We only see this flag if we're being called from
+        * ocfs2_read_locked_inode(). It means we're locking an inode
+        * which hasn't been populated yet, so clear the refresh flag
+        * and let the caller handle it.
+        */
+       if (inode->i_state & I_NEW) {
+               status = 0;
+               ocfs2_complete_lock_res_refresh(lockres, 0);
+               goto bail;
+       }
+
        /* This is fun. The caller may want a bh back, or it may
         * not. ocfs2_meta_lock_update definitely wants one in, but
         * may or may not read one, depending on what's in the
@@ -1807,6 +1827,34 @@ void ocfs2_rename_unlock(struct ocfs2_super *osb)
        ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
 }
 
+int ocfs2_dentry_lock(struct dentry *dentry, int ex)
+{
+       int ret;
+       int level = ex ? LKM_EXMODE : LKM_PRMODE;
+       struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
+       struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
+
+       BUG_ON(!dl);
+
+       if (ocfs2_is_hard_readonly(osb))
+               return -EROFS;
+
+       ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
+       if (ret < 0)
+               mlog_errno(ret);
+
+       return ret;
+}
+
+void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
+{
+       int level = ex ? LKM_EXMODE : LKM_PRMODE;
+       struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
+       struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
+
+       ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
+}
+
 /* Reference counting of the dlm debug structure. We want this because
  * open references on the debug inodes can live on after a mount, so
  * we can't rely on the ocfs2_super to always exist. */
@@ -1937,9 +1985,16 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
        if (!lockres)
                return -EINVAL;
 
-       seq_printf(m, "0x%x\t"
-                  "%.*s\t"
-                  "%d\t"
+       seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
+
+       if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
+               seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
+                          lockres->l_name,
+                          (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
+       else
+               seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
+
+       seq_printf(m, "%d\t"
                   "0x%lx\t"
                   "0x%x\t"
                   "0x%x\t"
@@ -1947,8 +2002,6 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
                   "%u\t"
                   "%d\t"
                   "%d\t",
-                  OCFS2_DLM_DEBUG_STR_VERSION,
-                  OCFS2_LOCK_ID_MAX_LEN, lockres->l_name,
                   lockres->l_level,
                   lockres->l_flags,
                   lockres->l_action,
@@ -2138,7 +2191,7 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
        mlog_exit_void();
 }
 
-static void ocfs2_unlock_ast_func(void *opaque, enum dlm_status status)
+static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
 {
        struct ocfs2_lock_res *lockres = opaque;
        unsigned long flags;
@@ -2194,24 +2247,20 @@ complete_unlock:
        mlog_exit_void();
 }
 
-typedef void (ocfs2_pre_drop_cb_t)(struct ocfs2_lock_res *, void *);
-
-struct drop_lock_cb {
-       ocfs2_pre_drop_cb_t     *drop_func;
-       void                    *drop_data;
-};
-
 static int ocfs2_drop_lock(struct ocfs2_super *osb,
-                          struct ocfs2_lock_res *lockres,
-                          struct drop_lock_cb *dcb)
+                          struct ocfs2_lock_res *lockres)
 {
        enum dlm_status status;
        unsigned long flags;
+       int lkm_flags = 0;
 
        /* We didn't get anywhere near actually using this lockres. */
        if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
                goto out;
 
+       if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
+               lkm_flags |= LKM_VALBLK;
+
        spin_lock_irqsave(&lockres->l_lock, flags);
 
        mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
@@ -2234,8 +2283,12 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
                spin_lock_irqsave(&lockres->l_lock, flags);
        }
 
-       if (dcb)
-               dcb->drop_func(lockres, dcb->drop_data);
+       if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
+               if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
+                   lockres->l_level == LKM_EXMODE &&
+                   !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
+                       lockres->l_ops->set_lvb(lockres);
+       }
 
        if (lockres->l_flags & OCFS2_LOCK_BUSY)
                mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
@@ -2261,8 +2314,8 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
 
        mlog(0, "lock %s\n", lockres->l_name);
 
-       status = dlmunlock(osb->dlm, &lockres->l_lksb, LKM_VALBLK,
-                          lockres->l_ops->unlock_ast, lockres);
+       status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
+                          ocfs2_unlock_ast, lockres);
        if (status != DLM_NORMAL) {
                ocfs2_log_dlm_error("dlmunlock", status, lockres);
                mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
@@ -2309,43 +2362,26 @@ void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 }
 
-static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
+void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
+                              struct ocfs2_lock_res *lockres)
 {
-       int status;
-
-       mlog_entry_void();
-
-       ocfs2_mark_lockres_freeing(&osb->osb_super_lockres);
-
-       status = ocfs2_drop_lock(osb, &osb->osb_super_lockres, NULL);
-       if (status < 0)
-               mlog_errno(status);
-
-       ocfs2_mark_lockres_freeing(&osb->osb_rename_lockres);
-
-       status = ocfs2_drop_lock(osb, &osb->osb_rename_lockres, NULL);
-       if (status < 0)
-               mlog_errno(status);
+       int ret;
 
-       mlog_exit(status);
+       ocfs2_mark_lockres_freeing(lockres);
+       ret = ocfs2_drop_lock(osb, lockres);
+       if (ret)
+               mlog_errno(ret);
 }
 
-static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data)
+static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
 {
-       struct inode *inode = data;
-
-       /* the metadata lock requires a bit more work as we have an
-        * LVB to worry about. */
-       if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
-           lockres->l_level == LKM_EXMODE &&
-           !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
-               __ocfs2_stuff_meta_lvb(inode);
+       ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
+       ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
 }
 
 int ocfs2_drop_inode_locks(struct inode *inode)
 {
        int status, err;
-       struct drop_lock_cb meta_dcb = { ocfs2_meta_pre_drop, inode, };
 
        mlog_entry_void();
 
@@ -2353,24 +2389,21 @@ int ocfs2_drop_inode_locks(struct inode *inode)
         * ocfs2_clear_inode has done it for us. */
 
        err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
-                             &OCFS2_I(inode)->ip_data_lockres,
-                             NULL);
+                             &OCFS2_I(inode)->ip_data_lockres);
        if (err < 0)
                mlog_errno(err);
 
        status = err;
 
        err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
-                             &OCFS2_I(inode)->ip_meta_lockres,
-                             &meta_dcb);
+                             &OCFS2_I(inode)->ip_meta_lockres);
        if (err < 0)
                mlog_errno(err);
        if (err < 0 && !status)
                status = err;
 
        err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
-                             &OCFS2_I(inode)->ip_rw_lockres,
-                             NULL);
+                             &OCFS2_I(inode)->ip_rw_lockres);
        if (err < 0)
                mlog_errno(err);
        if (err < 0 && !status)
@@ -2419,9 +2452,10 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
                         &lockres->l_lksb,
                         dlm_flags,
                         lockres->l_name,
-                        lockres->l_ops->ast,
+                        OCFS2_LOCK_ID_MAX_LEN - 1,
+                        ocfs2_locking_ast,
                         lockres,
-                        lockres->l_ops->bast);
+                        ocfs2_blocking_ast);
        if (status != DLM_NORMAL) {
                ocfs2_log_dlm_error("dlmlock", status, lockres);
                ret = -EINVAL;
@@ -2480,7 +2514,7 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
        status = dlmunlock(osb->dlm,
                           &lockres->l_lksb,
                           LKM_CANCEL,
-                          lockres->l_ops->unlock_ast,
+                          ocfs2_unlock_ast,
                           lockres);
        if (status != DLM_NORMAL) {
                ocfs2_log_dlm_error("dlmunlock", status, lockres);
@@ -2494,115 +2528,15 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
        return ret;
 }
 
-static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
-                                                 struct ocfs2_lock_res *lockres,
-                                                 int new_level)
-{
-       int ret;
-
-       mlog_entry_void();
-
-       BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
-
-       if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
-               ret = 0;
-               mlog(0, "lockres %s currently being refreshed -- backing "
-                    "off!\n", lockres->l_name);
-       } else if (new_level == LKM_PRMODE)
-               ret = !lockres->l_ex_holders &&
-                       ocfs2_inode_fully_checkpointed(inode);
-       else /* Must be NLMODE we're converting to. */
-               ret = !lockres->l_ro_holders && !lockres->l_ex_holders &&
-                       ocfs2_inode_fully_checkpointed(inode);
-
-       mlog_exit(ret);
-       return ret;
-}
-
-static int ocfs2_do_unblock_meta(struct inode *inode,
-                                int *requeue)
-{
-       int new_level;
-       int set_lvb = 0;
-       int ret = 0;
-       struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
-       unsigned long flags;
-
-       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
-
-       mlog_entry_void();
-
-       spin_lock_irqsave(&lockres->l_lock, flags);
-
-       BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
-
-       mlog(0, "l_level=%d, l_blocking=%d\n", lockres->l_level,
-            lockres->l_blocking);
-
-       BUG_ON(lockres->l_level != LKM_EXMODE &&
-              lockres->l_level != LKM_PRMODE);
-
-       if (lockres->l_flags & OCFS2_LOCK_BUSY) {
-               *requeue = 1;
-               ret = ocfs2_prepare_cancel_convert(osb, lockres);
-               spin_unlock_irqrestore(&lockres->l_lock, flags);
-               if (ret) {
-                       ret = ocfs2_cancel_convert(osb, lockres);
-                       if (ret < 0)
-                               mlog_errno(ret);
-               }
-               goto leave;
-       }
-
-       new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
-
-       mlog(0, "l_level=%d, l_blocking=%d, new_level=%d\n",
-            lockres->l_level, lockres->l_blocking, new_level);
-
-       if (ocfs2_can_downconvert_meta_lock(inode, lockres, new_level)) {
-               if (lockres->l_level == LKM_EXMODE)
-                       set_lvb = 1;
-
-               /* If the lock hasn't been refreshed yet (rare), then
-                * our memory inode values are old and we skip
-                * stuffing the lvb. There's no need to actually clear
-                * out the lvb here as it's value is still valid. */
-               if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
-                       if (set_lvb)
-                               __ocfs2_stuff_meta_lvb(inode);
-               } else
-                       mlog(0, "lockres %s: downconverting stale lock!\n",
-                            lockres->l_name);
-
-               mlog(0, "calling ocfs2_downconvert_lock with l_level=%d, "
-                    "l_blocking=%d, new_level=%d\n",
-                    lockres->l_level, lockres->l_blocking, new_level);
-
-               ocfs2_prepare_downconvert(lockres, new_level);
-               spin_unlock_irqrestore(&lockres->l_lock, flags);
-               ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
-               goto leave;
-       }
-       if (!ocfs2_inode_fully_checkpointed(inode))
-               ocfs2_start_checkpoint(osb);
-
-       *requeue = 1;
-       spin_unlock_irqrestore(&lockres->l_lock, flags);
-       ret = 0;
-leave:
-       mlog_exit(ret);
-       return ret;
-}
-
-static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
-                                     struct ocfs2_lock_res *lockres,
-                                     int *requeue,
-                                     ocfs2_convert_worker_t *worker)
+static int ocfs2_unblock_lock(struct ocfs2_super *osb,
+                             struct ocfs2_lock_res *lockres,
+                             struct ocfs2_unblock_ctl *ctl)
 {
        unsigned long flags;
        int blocking;
        int new_level;
        int ret = 0;
+       int set_lvb = 0;
 
        mlog_entry_void();
 
@@ -2612,7 +2546,7 @@ static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
 
 recheck:
        if (lockres->l_flags & OCFS2_LOCK_BUSY) {
-               *requeue = 1;
+               ctl->requeue = 1;
                ret = ocfs2_prepare_cancel_convert(osb, lockres);
                spin_unlock_irqrestore(&lockres->l_lock, flags);
                if (ret) {
@@ -2626,27 +2560,33 @@ recheck:
        /* if we're blocking an exclusive and we have *any* holders,
         * then requeue. */
        if ((lockres->l_blocking == LKM_EXMODE)
-           && (lockres->l_ex_holders || lockres->l_ro_holders)) {
-               spin_unlock_irqrestore(&lockres->l_lock, flags);
-               *requeue = 1;
-               ret = 0;
-               goto leave;
-       }
+           && (lockres->l_ex_holders || lockres->l_ro_holders))
+               goto leave_requeue;
 
        /* If it's a PR we're blocking, then only
         * requeue if we've got any EX holders */
        if (lockres->l_blocking == LKM_PRMODE &&
-           lockres->l_ex_holders) {
-               spin_unlock_irqrestore(&lockres->l_lock, flags);
-               *requeue = 1;
-               ret = 0;
-               goto leave;
-       }
+           lockres->l_ex_holders)
+               goto leave_requeue;
+
+       /*
+        * Can we get a lock in this state if the holder counts are
+        * zero? The meta data unblock code used to check this.
+        */
+       if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
+           && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
+               goto leave_requeue;
+
+       new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
+
+       if (lockres->l_ops->check_downconvert
+           && !lockres->l_ops->check_downconvert(lockres, new_level))
+               goto leave_requeue;
 
        /* If we get here, then we know that there are no more
         * incompatible holders (and anyone asking for an incompatible
         * lock is blocked). We can now downconvert the lock */
-       if (!worker)
+       if (!lockres->l_ops->downconvert_worker)
                goto downconvert;
 
        /* Some lockres types want to do a bit of work before
@@ -2656,7 +2596,10 @@ recheck:
        blocking = lockres->l_blocking;
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-       worker(lockres, blocking);
+       ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
+
+       if (ctl->unblock_action == UNBLOCK_STOP_POST)
+               goto leave;
 
        spin_lock_irqsave(&lockres->l_lock, flags);
        if (blocking != lockres->l_blocking) {
@@ -2666,25 +2609,43 @@ recheck:
        }
 
 downconvert:
-       *requeue = 0;
-       new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
+       ctl->requeue = 0;
+
+       if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
+               if (lockres->l_level == LKM_EXMODE)
+                       set_lvb = 1;
+
+               /*
+                * We only set the lvb if the lock has been fully
+                * refreshed - otherwise we risk setting stale
+                * data. Otherwise, there's no need to actually clear
+                * out the lvb here as it's value is still valid.
+                */
+               if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
+                       lockres->l_ops->set_lvb(lockres);
+       }
 
        ocfs2_prepare_downconvert(lockres, new_level);
        spin_unlock_irqrestore(&lockres->l_lock, flags);
-       ret = ocfs2_downconvert_lock(osb, lockres, new_level, 0);
+       ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
 leave:
        mlog_exit(ret);
        return ret;
+
+leave_requeue:
+       spin_unlock_irqrestore(&lockres->l_lock, flags);
+       ctl->requeue = 1;
+
+       mlog_exit(0);
+       return 0;
 }
 
-static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
-                                     int blocking)
+static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
+                                    int blocking)
 {
        struct inode *inode;
        struct address_space *mapping;
 
-       mlog_entry_void();
-
                inode = ocfs2_lock_res_inode(lockres);
        mapping = inode->i_mapping;
 
@@ -2705,116 +2666,159 @@ static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
                filemap_fdatawait(mapping);
        }
 
-       mlog_exit_void();
+       return UNBLOCK_CONTINUE;
 }
 
-int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
-                      int *requeue)
+static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
+                                       int new_level)
 {
-       int status;
-       struct inode *inode;
-       struct ocfs2_super *osb;
-
-       mlog_entry_void();
-
-       inode = ocfs2_lock_res_inode(lockres);
-       osb = OCFS2_SB(inode->i_sb);
-
-       mlog(0, "unblock inode %llu\n",
-            (unsigned long long)OCFS2_I(inode)->ip_blkno);
+       struct inode *inode = ocfs2_lock_res_inode(lockres);
+       int checkpointed = ocfs2_inode_fully_checkpointed(inode);
 
-       status = ocfs2_generic_unblock_lock(osb,
-                                           lockres,
-                                           requeue,
-                                           ocfs2_data_convert_worker);
-       if (status < 0)
-               mlog_errno(status);
+       BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
+       BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
 
-       mlog(0, "inode %llu, requeue = %d\n",
-            (unsigned long long)OCFS2_I(inode)->ip_blkno, *requeue);
+       if (checkpointed)
+               return 1;
 
-       mlog_exit(status);
-       return status;
+       ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
+       return 0;
 }
 
-static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
-                                   int *requeue)
+static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
 {
-       int status;
-       struct inode *inode;
-
-       mlog_entry_void();
-
-       mlog(0, "Unblock lockres %s\n", lockres->l_name);
-
-       inode  = ocfs2_lock_res_inode(lockres);
+       struct inode *inode = ocfs2_lock_res_inode(lockres);
 
-       status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
-                                           lockres,
-                                           requeue,
-                                           NULL);
-       if (status < 0)
-               mlog_errno(status);
-
-       mlog_exit(status);
-       return status;
+       __ocfs2_stuff_meta_lvb(inode);
 }
 
-
-int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
-                      int *requeue)
+/*
+ * Does the final reference drop on our dentry lock. Right now this
+ * happens in the vote thread, but we could choose to simplify the
+ * dlmglue API and push these off to the ocfs2_wq in the future.
+ */
+static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
+                                    struct ocfs2_lock_res *lockres)
 {
-       int status;
-       struct inode *inode;
-
-       mlog_entry_void();
+       struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
+       ocfs2_dentry_lock_put(osb, dl);
+}
 
-               inode = ocfs2_lock_res_inode(lockres);
+/*
+ * d_delete() matching dentries before the lock downconvert.
+ *
+ * At this point, any process waiting to destroy the
+ * dentry_lock due to last ref count is stopped by the
+ * OCFS2_LOCK_QUEUED flag.
+ *
+ * We have two potential problems
+ *
+ * 1) If we do the last reference drop on our dentry_lock (via dput)
+ *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
+ *    the downconvert to finish. Instead we take an elevated
+ *    reference and push the drop until after we've completed our
+ *    unblock processing.
+ *
+ * 2) There might be another process with a final reference,
+ *    waiting on us to finish processing. If this is the case, we
+ *    detect it and exit out - there's no more dentries anyway.
+ */
+static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
+                                      int blocking)
+{
+       struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
+       struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
+       struct dentry *dentry;
+       unsigned long flags;
+       int extra_ref = 0;
 
-       mlog(0, "unblock inode %llu\n",
-            (unsigned long long)OCFS2_I(inode)->ip_blkno);
+       /*
+        * This node is blocking another node from getting a read
+        * lock. This happens when we've renamed within a
+        * directory. We've forced the other nodes to d_delete(), but
+        * we never actually dropped our lock because it's still
+        * valid. The downconvert code will retain a PR for this node,
+        * so there's no further work to do.
+        */
+       if (blocking == LKM_PRMODE)
+               return UNBLOCK_CONTINUE;
 
-       status = ocfs2_do_unblock_meta(inode, requeue);
-       if (status < 0)
-               mlog_errno(status);
+       /*
+        * Mark this inode as potentially orphaned. The code in
+        * ocfs2_delete_inode() will figure out whether it actually
+        * needs to be freed or not.
+        */
+       spin_lock(&oi->ip_lock);
+       oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
+       spin_unlock(&oi->ip_lock);
 
-       mlog(0, "inode %llu, requeue = %d\n",
-            (unsigned long long)OCFS2_I(inode)->ip_blkno, *requeue);
+       /*
+        * Yuck. We need to make sure however that the check of
+        * OCFS2_LOCK_FREEING and the extra reference are atomic with
+        * respect to a reference decrement or the setting of that
+        * flag.
+        */
+       spin_lock_irqsave(&lockres->l_lock, flags);
+       spin_lock(&dentry_attach_lock);
+       if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
+           && dl->dl_count) {
+               dl->dl_count++;
+               extra_ref = 1;
+       }
+       spin_unlock(&dentry_attach_lock);
+       spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-       mlog_exit(status);
-       return status;
-}
+       mlog(0, "extra_ref = %d\n", extra_ref);
 
-/* Generic unblock function for any lockres whose private data is an
- * ocfs2_super pointer. */
-static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
-                                 int *requeue)
-{
-       int status;
-       struct ocfs2_super *osb;
+       /*
+        * We have a process waiting on us in ocfs2_dentry_iput(),
+        * which means we can't have any more outstanding
+        * aliases. There's no need to do any more work.
+        */
+       if (!extra_ref)
+               return UNBLOCK_CONTINUE;
+
+       spin_lock(&dentry_attach_lock);
+       while (1) {
+               dentry = ocfs2_find_local_alias(dl->dl_inode,
+                                               dl->dl_parent_blkno, 1);
+               if (!dentry)
+                       break;
+               spin_unlock(&dentry_attach_lock);
 
-       mlog_entry_void();
+               mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
+                    dentry->d_name.name);
 
-       mlog(0, "Unblock lockres %s\n", lockres->l_name);
+               /*
+                * The following dcache calls may do an
+                * iput(). Normally we don't want that from the
+                * downconverting thread, but in this case it's ok
+                * because the requesting node already has an
+                * exclusive lock on the inode, so it can't be queued
+                * for a downconvert.
+                */
+               d_delete(dentry);
+               dput(dentry);
 
-       osb = ocfs2_lock_res_super(lockres);
+               spin_lock(&dentry_attach_lock);
+       }
+       spin_unlock(&dentry_attach_lock);
 
-       status = ocfs2_generic_unblock_lock(osb,
-                                           lockres,
-                                           requeue,
-                                           NULL);
-       if (status < 0)
-               mlog_errno(status);
+       /*
+        * If we are the last holder of this dentry lock, there is no
+        * reason to downconvert so skip straight to the unlock.
+        */
+       if (dl->dl_count == 1)
+               return UNBLOCK_STOP_POST;
 
-       mlog_exit(status);
-       return status;
+       return UNBLOCK_CONTINUE_POST;
 }
 
 void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
                                struct ocfs2_lock_res *lockres)
 {
        int status;
-       int requeue = 0;
+       struct ocfs2_unblock_ctl ctl = {0, 0,};
        unsigned long flags;
 
        /* Our reference to the lockres in this function can be
@@ -2825,7 +2829,6 @@ void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
 
        BUG_ON(!lockres);
        BUG_ON(!lockres->l_ops);
-       BUG_ON(!lockres->l_ops->unblock);
 
        mlog(0, "lockres %s blocked.\n", lockres->l_name);
 
@@ -2839,21 +2842,25 @@ void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
                goto unqueue;
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-       status = lockres->l_ops->unblock(lockres, &requeue);
+       status = ocfs2_unblock_lock(osb, lockres, &ctl);
        if (status < 0)
                mlog_errno(status);
 
        spin_lock_irqsave(&lockres->l_lock, flags);
 unqueue:
-       if (lockres->l_flags & OCFS2_LOCK_FREEING || !requeue) {
+       if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
                lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
        } else
                ocfs2_schedule_blocked_lock(osb, lockres);
 
        mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
-            requeue ? "yes" : "no");
+            ctl.requeue ? "yes" : "no");
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
+       if (ctl.unblock_action != UNBLOCK_CONTINUE
+           && lockres->l_ops->post_unlock)
+               lockres->l_ops->post_unlock(osb, lockres);
+
        mlog_exit_void();
 }
 
@@ -2896,8 +2903,9 @@ void ocfs2_dump_meta_lvb_info(u64 level,
 
        mlog(level, "LVB information for %s (called from %s:%u):\n",
             lockres->l_name, function, line);
-       mlog(level, "version: %u, clusters: %u\n",
-            be32_to_cpu(lvb->lvb_version), be32_to_cpu(lvb->lvb_iclusters));
+       mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
+            lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
+            be32_to_cpu(lvb->lvb_igeneration));
        mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
             (unsigned long long)be64_to_cpu(lvb->lvb_isize),
             be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
index 243ae86..4a27693 100644 (file)
 #ifndef DLMGLUE_H
 #define DLMGLUE_H
 
-#define OCFS2_LVB_VERSION 3
+#include "dcache.h"
+
+#define OCFS2_LVB_VERSION 4
 
 struct ocfs2_meta_lvb {
-       __be32       lvb_version;
+       __u8         lvb_version;
+       __u8         lvb_reserved0;
+       __be16       lvb_reserved1;
        __be32       lvb_iclusters;
        __be32       lvb_iuid;
        __be32       lvb_igid;
@@ -41,7 +45,8 @@ struct ocfs2_meta_lvb {
        __be16       lvb_imode;
        __be16       lvb_inlink;
        __be32       lvb_iattr;
-       __be32       lvb_reserved[2];
+       __be32       lvb_igeneration;
+       __be32       lvb_reserved2;
 };
 
 /* ocfs2_meta_lock_full() and ocfs2_data_lock_full() 'arg_flags' flags */
@@ -57,9 +62,14 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb);
 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res);
 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
                               enum ocfs2_lock_type type,
+                              unsigned int generation,
                               struct inode *inode);
+void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
+                               u64 parent, struct inode *inode);
 void ocfs2_lock_res_free(struct ocfs2_lock_res *res);
 int ocfs2_create_new_inode_locks(struct inode *inode);
+int ocfs2_create_new_lock(struct ocfs2_super *osb,
+                         struct ocfs2_lock_res *lockres, int ex, int local);
 int ocfs2_drop_inode_locks(struct inode *inode);
 int ocfs2_data_lock_full(struct inode *inode,
                         int write,
@@ -93,7 +103,12 @@ void ocfs2_super_unlock(struct ocfs2_super *osb,
                        int ex);
 int ocfs2_rename_lock(struct ocfs2_super *osb);
 void ocfs2_rename_unlock(struct ocfs2_super *osb);
+int ocfs2_dentry_lock(struct dentry *dentry, int ex);
+void ocfs2_dentry_unlock(struct dentry *dentry, int ex);
+
 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres);
+void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
+                              struct ocfs2_lock_res *lockres);
 
 /* for the vote thread */
 void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
index ec55ab3..fb91089 100644 (file)
@@ -33,6 +33,7 @@
 
 #include "dir.h"
 #include "dlmglue.h"
+#include "dcache.h"
 #include "export.h"
 #include "inode.h"
 
@@ -57,7 +58,7 @@ static struct dentry *ocfs2_get_dentry(struct super_block *sb, void *vobjp)
                return ERR_PTR(-ESTALE);
        }
 
-       inode = ocfs2_iget(OCFS2_SB(sb), handle->ih_blkno);
+       inode = ocfs2_iget(OCFS2_SB(sb), handle->ih_blkno, 0);
 
        if (IS_ERR(inode)) {
                mlog_errno(PTR_ERR(inode));
@@ -77,6 +78,7 @@ static struct dentry *ocfs2_get_dentry(struct super_block *sb, void *vobjp)
                mlog_errno(-ENOMEM);
                return ERR_PTR(-ENOMEM);
        }
+       result->d_op = &ocfs2_dentry_ops;
 
        mlog_exit_ptr(result);
        return result;
@@ -113,7 +115,7 @@ static struct dentry *ocfs2_get_parent(struct dentry *child)
                goto bail_unlock;
        }
 
-       inode = ocfs2_iget(OCFS2_SB(dir->i_sb), blkno);
+       inode = ocfs2_iget(OCFS2_SB(dir->i_sb), blkno, 0);
        if (IS_ERR(inode)) {
                mlog(ML_ERROR, "Unable to create inode %llu\n",
                     (unsigned long long)blkno);
@@ -127,6 +129,8 @@ static struct dentry *ocfs2_get_parent(struct dentry *child)
                parent = ERR_PTR(-ENOMEM);
        }
 
+       parent->d_op = &ocfs2_dentry_ops;
+
 bail_unlock:
        ocfs2_meta_unlock(dir, 0);
 
index 7bcf691..69d3db5 100644 (file)
@@ -54,8 +54,6 @@
 
 #include "buffer_head_io.h"
 
-#define OCFS2_FI_FLAG_NOWAIT   0x1
-#define OCFS2_FI_FLAG_DELETE   0x2
 struct ocfs2_find_inode_args
 {
        u64             fi_blkno;
@@ -109,7 +107,7 @@ struct inode *ocfs2_ilookup_for_vote(struct ocfs2_super *osb,
        return ilookup5(osb->sb, args.fi_ino, ocfs2_find_actor, &args);
 }
 
-struct inode *ocfs2_iget(struct ocfs2_super *osb, u64 blkno)
+struct inode *ocfs2_iget(struct ocfs2_super *osb, u64 blkno, int flags)
 {
        struct inode *inode = NULL;
        struct super_block *sb = osb->sb;
@@ -127,7 +125,7 @@ struct inode *ocfs2_iget(struct ocfs2_super *osb, u64 blkno)
        }
 
        args.fi_blkno = blkno;
-       args.fi_flags = 0;
+       args.fi_flags = flags;
        args.fi_ino = ino_from_blkno(sb, blkno);
 
        inode = iget5_locked(sb, args.fi_ino, ocfs2_find_actor,
@@ -297,15 +295,11 @@ int ocfs2_populate_inode(struct inode *inode, struct ocfs2_dinode *fe,
        OCFS2_I(inode)->ip_orphaned_slot = OCFS2_INVALID_SLOT;
        OCFS2_I(inode)->ip_attr = le32_to_cpu(fe->i_attr);
 
-       if (create_ino)
-               inode->i_ino = ino_from_blkno(inode->i_sb,
-                              le64_to_cpu(fe->i_blkno));
-
-       mlog(0, "blkno = %llu, ino = %lu, create_ino = %s\n",
-            (unsigned long long)fe->i_blkno, inode->i_ino, create_ino ? "true" : "false");
-
        inode->i_nlink = le16_to_cpu(fe->i_links_count);
 
+       if (fe->i_flags & cpu_to_le32(OCFS2_SYSTEM_FL))
+               OCFS2_I(inode)->ip_flags |= OCFS2_INODE_SYSTEM_FILE;
+
        if (fe->i_flags & cpu_to_le32(OCFS2_LOCAL_ALLOC_FL)) {
                OCFS2_I(inode)->ip_flags |= OCFS2_INODE_BITMAP;
                mlog(0, "local alloc inode: i_ino=%lu\n", inode->i_ino);
@@ -343,12 +337,28 @@ int ocfs2_populate_inode(struct inode *inode, struct ocfs2_dinode *fe,
                    break;
        }
 
+       if (create_ino) {
+               inode->i_ino = ino_from_blkno(inode->i_sb,
+                              le64_to_cpu(fe->i_blkno));
+
+               /*
+                * If we ever want to create system files from kernel,
+                * the generation argument to
+                * ocfs2_inode_lock_res_init() will have to change.
+                */
+               BUG_ON(fe->i_flags & cpu_to_le32(OCFS2_SYSTEM_FL));
+
+               ocfs2_inode_lock_res_init(&OCFS2_I(inode)->ip_meta_lockres,
+                                         OCFS2_LOCK_TYPE_META, 0, inode);
+       }
+
        ocfs2_inode_lock_res_init(&OCFS2_I(inode)->ip_rw_lockres,
-                                 OCFS2_LOCK_TYPE_RW, inode);
-       ocfs2_inode_lock_res_init(&OCFS2_I(inode)->ip_meta_lockres,
-                                 OCFS2_LOCK_TYPE_META, inode);
+                                 OCFS2_LOCK_TYPE_RW, inode->i_generation,
+                                 inode);
+
        ocfs2_inode_lock_res_init(&OCFS2_I(inode)->ip_data_lockres,
-                                 OCFS2_LOCK_TYPE_DATA, inode);
+                                 OCFS2_LOCK_TYPE_DATA, inode->i_generation,
+                                 inode);
 
        ocfs2_set_inode_flags(inode);
        inode->i_flags |= S_NOATIME;
@@ -366,15 +376,15 @@ static int ocfs2_read_locked_inode(struct inode *inode,
        struct ocfs2_super *osb;
        struct ocfs2_dinode *fe;
        struct buffer_head *bh = NULL;
-       int status;
-       int sysfile = 0;
+       int status, can_lock;
+       u32 generation = 0;
 
        mlog_entry("(0x%p, 0x%p)\n", inode, args);
 
        status = -EINVAL;
        if (inode == NULL || inode->i_sb == NULL) {
                mlog(ML_ERROR, "bad inode\n");
-               goto bail;
+               return status;
        }
        sb = inode->i_sb;
        osb = OCFS2_SB(sb);
@@ -382,50 +392,110 @@ static int ocfs2_read_locked_inode(struct inode *inode,
        if (!args) {
                mlog(ML_ERROR, "bad inode args\n");
                make_bad_inode(inode);
-               goto bail;
+               return status;
+       }
+
+       /*
+        * To improve performance of cold-cache inode stats, we take
+        * the cluster lock here if possible.
+        *
+        * Generally, OCFS2 never trusts the contents of an inode
+        * unless it's holding a cluster lock, so taking it here isn't
+        * a correctness issue as much as it is a performance
+        * improvement.
+        *
+        * There are three times when taking the lock is not a good idea:
+        *
+        * 1) During startup, before we have initialized the DLM.
+        *
+        * 2) If we are reading certain system files which never get
+        *    cluster locks (local alloc, truncate log).
+        *
+        * 3) If the process doing the iget() is responsible for
+        *    orphan dir recovery. We're holding the orphan dir lock and
+        *    can get into a deadlock with another process on another
+        *    node in ->delete_inode().
+        *
+        * #1 and #2 can be simply solved by never taking the lock
+        * here for system files (which are the only type we read
+        * during mount). It's a heavier approach, but our main
+        * concern is user-accesible files anyway.
+        *
+        * #3 works itself out because we'll eventually take the
+        * cluster lock before trusting anything anyway.
+        */
+       can_lock = !(args->fi_flags & OCFS2_FI_FLAG_SYSFILE)
+               && !(args->fi_flags & OCFS2_FI_FLAG_NOLOCK);
+
+       /*
+        * To maintain backwards compatibility with older versions of
+        * ocfs2-tools, we still store the generation value for system
+        * files. The only ones that actually matter to userspace are
+        * the journals, but it's easier and inexpensive to just flag
+        * all system files similarly.
+        */
+       if (args->fi_flags & OCFS2_FI_FLAG_SYSFILE)
+               generation = osb->fs_generation;
+
+       ocfs2_inode_lock_res_init(&OCFS2_I(inode)->ip_meta_lockres,
+                                 OCFS2_LOCK_TYPE_META,
+                                 generation, inode);
+
+       if (can_lock) {
+               status = ocfs2_meta_lock(inode, NULL, NULL, 0);
+               if (status) {
+                       make_bad_inode(inode);
+                       mlog_errno(status);
+                       return status;
+               }
        }
 
-       /* Read the FE off disk. This is safe because the kernel only
-        * does one read_inode2 for a new inode, and if it doesn't
-        * exist yet then nobody can be working on it! */
-       status = ocfs2_read_block(osb, args->fi_blkno, &bh, 0, NULL);
+       status = ocfs2_read_block(osb, args->fi_blkno, &bh, 0,
+                                 can_lock ? inode : NULL);
        if (status < 0) {
                mlog_errno(status);
-               make_bad_inode(inode);
                goto bail;
        }
 
+       status = -EINVAL;
        fe = (struct ocfs2_dinode *) bh->b_data;
        if (!OCFS2_IS_VALID_DINODE(fe)) {
                mlog(ML_ERROR, "Invalid dinode #%llu: signature = %.*s\n",
                     (unsigned long long)fe->i_blkno, 7, fe->i_signature);
-               make_bad_inode(inode);
                goto bail;
        }
 
-       if (fe->i_flags & cpu_to_le32(OCFS2_SYSTEM_FL))
-               sysfile = 1;
+       /*
+        * This is a code bug. Right now the caller needs to
+        * understand whether it is asking for a system file inode or
+        * not so the proper lock names can be built.
+        */
+       mlog_bug_on_msg(!!(fe->i_flags & cpu_to_le32(OCFS2_SYSTEM_FL)) !=
+                       !!(args->fi_flags & OCFS2_FI_FLAG_SYSFILE),
+                       "Inode %llu: system file state is ambigous\n",
+                       (unsigned long long)args->fi_blkno);
 
        if (S_ISCHR(le16_to_cpu(fe->i_mode)) ||
            S_ISBLK(le16_to_cpu(fe->i_mode)))
                inode->i_rdev = huge_decode_dev(le64_to_cpu(fe->id1.dev1.i_rdev));
 
-       status = -EINVAL;
        if (ocfs2_populate_inode(inode, fe, 0) < 0) {
                mlog(ML_ERROR, "populate failed! i_blkno=%llu, i_ino=%lu\n",
                     (unsigned long long)fe->i_blkno, inode->i_ino);
-               make_bad_inode(inode);
                goto bail;
        }
 
        BUG_ON(args->fi_blkno != le64_to_cpu(fe->i_blkno));
 
-       if (sysfile)
-              OCFS2_I(inode)->ip_flags |= OCFS2_INODE_SYSTEM_FILE;
-
        status = 0;
 
 bail:
+       if (can_lock)
+               ocfs2_meta_unlock(inode, 0);
+
+       if (status < 0)
+               make_bad_inode(inode);
+
        if (args && bh)
                brelse(bh);
 
@@ -898,9 +968,15 @@ void ocfs2_delete_inode(struct inode *inode)
                goto bail_unlock_inode;
        }
 
-       /* Mark the inode as successfully deleted. This is important
-        * for ocfs2_clear_inode as it will check this flag and skip
-        * any checkpointing work */
+       /*
+        * Mark the inode as successfully deleted.
+        *
+        * This is important for ocfs2_clear_inode() as it will check
+        * this flag and skip any checkpointing work
+        *
+        * ocfs2_stuff_meta_lvb() also uses this flag to invalidate
+        * the LVB for other nodes.
+        */
        OCFS2_I(inode)->ip_flags |= OCFS2_INODE_DELETED;
 
 bail_unlock_inode:
@@ -1025,12 +1101,10 @@ void ocfs2_drop_inode(struct inode *inode)
        /* Testing ip_orphaned_slot here wouldn't work because we may
         * not have gotten a delete_inode vote from any other nodes
         * yet. */
-       if (oi->ip_flags & OCFS2_INODE_MAYBE_ORPHANED) {
-               mlog(0, "Inode was orphaned on another node, clearing nlink.\n");
-               inode->i_nlink = 0;
-       }
-
-       generic_drop_inode(inode);
+       if (oi->ip_flags & OCFS2_INODE_MAYBE_ORPHANED)
+               generic_delete_inode(inode);
+       else
+               generic_drop_inode(inode);
 
        mlog_exit_void();
 }
index 4d1e539..9957810 100644 (file)
@@ -122,7 +122,13 @@ struct buffer_head *ocfs2_bread(struct inode *inode, int block,
 void ocfs2_clear_inode(struct inode *inode);
 void ocfs2_delete_inode(struct inode *inode);
 void ocfs2_drop_inode(struct inode *inode);
-struct inode *ocfs2_iget(struct ocfs2_super *osb, u64 feoff);
+
+/* Flags for ocfs2_iget() */
+#define OCFS2_FI_FLAG_NOWAIT   0x1
+#define OCFS2_FI_FLAG_DELETE   0x2
+#define OCFS2_FI_FLAG_SYSFILE  0x4
+#define OCFS2_FI_FLAG_NOLOCK   0x8
+struct inode *ocfs2_iget(struct ocfs2_super *osb, u64 feoff, int flags);
 struct inode *ocfs2_ilookup_for_vote(struct ocfs2_super *osb,
                                     u64 blkno,
                                     int delete_vote);
index f92bf1d..fd9734d 100644 (file)
@@ -1493,7 +1493,8 @@ static int ocfs2_queue_orphans(struct ocfs2_super *osb,
                        if (de->name_len == 2 && !strncmp("..", de->name, 2))
                                continue;
 
-                       iter = ocfs2_iget(osb, le64_to_cpu(de->inode));
+                       iter = ocfs2_iget(osb, le64_to_cpu(de->inode),
+                                         OCFS2_FI_FLAG_NOLOCK);
                        if (IS_ERR(iter))
                                continue;
 
index 0d3e939..849c3b4 100644 (file)
@@ -179,7 +179,7 @@ static struct dentry *ocfs2_lookup(struct inode *dir, struct dentry *dentry,
        if (status < 0)
                goto bail_add;
 
-       inode = ocfs2_iget(OCFS2_SB(dir->i_sb), blkno);
+       inode = ocfs2_iget(OCFS2_SB(dir->i_sb), blkno, 0);
        if (IS_ERR(inode)) {
                mlog(ML_ERROR, "Unable to create inode %llu\n",
                     (unsigned long long)blkno);
@@ -199,10 +199,32 @@ static struct dentry *ocfs2_lookup(struct inode *dir, struct dentry *dentry,
        spin_unlock(&oi->ip_lock);
 
 bail_add:
-
        dentry->d_op = &ocfs2_dentry_ops;
        ret = d_splice_alias(inode, dentry);
 
+       if (inode) {
+               /*
+                * If d_splice_alias() finds a DCACHE_DISCONNECTED
+                * dentry, it will d_move() it on top of ourse. The
+                * return value will indicate this however, so in
+                * those cases, we switch them around for the locking
+                * code.
+                *
+                * NOTE: This dentry already has ->d_op set from
+                * ocfs2_get_parent() and ocfs2_get_dentry()
+                */
+               if (ret)
+                       dentry = ret;
+
+               status = ocfs2_dentry_attach_lock(dentry, inode,
+                                                 OCFS2_I(dir)->ip_blkno);
+               if (status) {
+                       mlog_errno(status);
+                       ret = ERR_PTR(status);
+                       goto bail_unlock;
+               }
+       }
+
 bail_unlock:
        /* Don't drop the cluster lock until *after* the d_add --
         * unlink on another node will message us to remove that
@@ -418,6 +440,13 @@ static int ocfs2_mknod(struct inode *dir,
                goto leave;
        }
 
+       status = ocfs2_dentry_attach_lock(dentry, inode,
+                                         OCFS2_I(dir)->ip_blkno);
+       if (status) {
+               mlog_errno(status);
+               goto leave;
+       }
+
        insert_inode_hash(inode);
        dentry->d_op = &ocfs2_dentry_ops;
        d_instantiate(dentry, inode);
@@ -725,6 +754,12 @@ static int ocfs2_link(struct dentry *old_dentry,
                goto bail;
        }
 
+       err = ocfs2_dentry_attach_lock(dentry, inode, OCFS2_I(dir)->ip_blkno);
+       if (err) {
+               mlog_errno(err);
+               goto bail;
+       }
+
        atomic_inc(&inode->i_count);
        dentry->d_op = &ocfs2_dentry_ops;
        d_instantiate(dentry, inode);
@@ -743,6 +778,23 @@ bail:
        return err;
 }
 
+/*
+ * Takes and drops an exclusive lock on the given dentry. This will
+ * force other nodes to drop it.
+ */
+static int ocfs2_remote_dentry_delete(struct dentry *dentry)
+{
+       int ret;
+
+       ret = ocfs2_dentry_lock(dentry, 1);
+       if (ret)
+               mlog_errno(ret);
+       else
+               ocfs2_dentry_unlock(dentry, 1);
+
+       return ret;
+}
+
 static int ocfs2_unlink(struct inode *dir,
                        struct dentry *dentry)
 {
@@ -832,8 +884,7 @@ static int ocfs2_unlink(struct inode *dir,
        else
                inode->i_nlink--;
 
-       status = ocfs2_request_unlink_vote(inode, dentry,
-                                          (unsigned int) inode->i_nlink);
+       status = ocfs2_remote_dentry_delete(dentry);
        if (status < 0) {
                /* This vote should succeed under all normal
                 * circumstances. */
@@ -1019,7 +1070,6 @@ static int ocfs2_rename(struct inode *old_dir,
        struct buffer_head *old_inode_de_bh = NULL; // if old_dentry is a dir,
                                                    // this is the 1st dirent bh
        nlink_t old_dir_nlink = old_dir->i_nlink, new_dir_nlink = new_dir->i_nlink;
-       unsigned int links_count;
 
        /* At some point it might be nice to break this function up a
         * bit. */
@@ -1093,23 +1143,26 @@ static int ocfs2_rename(struct inode *old_dir,
                }
        }
 
-       if (S_ISDIR(old_inode->i_mode)) {
-               /* Directories actually require metadata updates to
-                * the directory info so we can't get away with not
-                * doing node locking on it. */
-               status = ocfs2_meta_lock(old_inode, handle, NULL, 1);
-               if (status < 0) {
-                       if (status != -ENOENT)
-                               mlog_errno(status);
-                       goto bail;
-               }
-
-               status = ocfs2_request_rename_vote(old_inode, old_dentry);
-               if (status < 0) {
+       /*
+        * Though we don't require an inode meta data update if
+        * old_inode is not a directory, we lock anyway here to ensure
+        * the vote thread on other nodes won't have to concurrently
+        * downconvert the inode and the dentry locks.
+        */
+       status = ocfs2_meta_lock(old_inode, handle, NULL, 1);
+       if (status < 0) {
+               if (status != -ENOENT)
                        mlog_errno(status);
-                       goto bail;
-               }
+               goto bail;
+       }
+
+       status = ocfs2_remote_dentry_delete(old_dentry);
+       if (status < 0) {
+               mlog_errno(status);
+               goto bail;
+       }
 
+       if (S_ISDIR(old_inode->i_mode)) {
                status = -EIO;
                old_inode_de_bh = ocfs2_bread(old_inode, 0, &status, 0);
                if (!old_inode_de_bh)
@@ -1123,14 +1176,6 @@ static int ocfs2_rename(struct inode *old_dir,
                if (!new_inode && new_dir!=old_dir &&
                    new_dir->i_nlink >= OCFS2_LINK_MAX)
                        goto bail;
-       } else {
-               /* Ah, the simple case - we're a file so just send a
-                * message. */
-               status = ocfs2_request_rename_vote(old_inode, old_dentry);
-               if (status < 0) {
-                       mlog_errno(status);
-                       goto bail;
-               }
        }
 
        status = -ENOENT;
@@ -1202,13 +1247,7 @@ static int ocfs2_rename(struct inode *old_dir,
                        goto bail;
                }
 
-               if (S_ISDIR(new_inode->i_mode))
-                       links_count = 0;
-               else
-                       links_count = (unsigned int) (new_inode->i_nlink - 1);
-
-               status = ocfs2_request_unlink_vote(new_inode, new_dentry,
-                                                  links_count);
+               status = ocfs2_remote_dentry_delete(new_dentry);
                if (status < 0) {
                        mlog_errno(status);
                        goto bail;
@@ -1387,6 +1426,7 @@ static int ocfs2_rename(struct inode *old_dir,
                }
        }
 
+       ocfs2_dentry_move(old_dentry, new_dentry, old_dir, new_dir);
        status = 0;
 bail:
        if (rename_lock)
@@ -1675,6 +1715,12 @@ static int ocfs2_symlink(struct inode *dir,
                goto bail;
        }
 
+       status = ocfs2_dentry_attach_lock(dentry, inode, OCFS2_I(dir)->ip_blkno);
+       if (status) {
+               mlog_errno(status);
+               goto bail;
+       }
+
        insert_inode_hash(inode);
        dentry->d_op = &ocfs2_dentry_ops;
        d_instantiate(dentry, inode);
index 7dd9e1e..4d5d565 100644 (file)
 #define OCFS2_LOCK_ID_MAX_LEN  32
 #define OCFS2_LOCK_ID_PAD "000000"
 
+#define OCFS2_DENTRY_LOCK_INO_START 18
+
 enum ocfs2_lock_type {
        OCFS2_LOCK_TYPE_META = 0,
        OCFS2_LOCK_TYPE_DATA,
        OCFS2_LOCK_TYPE_SUPER,
        OCFS2_LOCK_TYPE_RENAME,
        OCFS2_LOCK_TYPE_RW,
+       OCFS2_LOCK_TYPE_DENTRY,
        OCFS2_NUM_LOCK_TYPES
 };
 
@@ -63,6 +66,9 @@ static inline char ocfs2_lock_type_char(enum ocfs2_lock_type type)
                case OCFS2_LOCK_TYPE_RW:
                        c = 'W';
                        break;
+               case OCFS2_LOCK_TYPE_DENTRY:
+                       c = 'N';
+                       break;
                default:
                        c = '\0';
        }
@@ -70,4 +76,23 @@ static inline char ocfs2_lock_type_char(enum ocfs2_lock_type type)
        return c;
 }
 
+static char *ocfs2_lock_type_strings[] = {
+       [OCFS2_LOCK_TYPE_META] = "Meta",
+       [OCFS2_LOCK_TYPE_DATA] = "Data",
+       [OCFS2_LOCK_TYPE_SUPER] = "Super",
+       [OCFS2_LOCK_TYPE_RENAME] = "Rename",
+       /* Need to differntiate from [R]ename.. serializing writes is the
+        * important job it does, anyway. */
+       [OCFS2_LOCK_TYPE_RW] = "Write/Read",
+       [OCFS2_LOCK_TYPE_DENTRY] = "Dentry",
+};
+
+static inline const char *ocfs2_lock_type_string(enum ocfs2_lock_type type)
+{
+#ifdef __KERNEL__
+       mlog_bug_on_msg(type >= OCFS2_NUM_LOCK_TYPES, "%d\n", type);
+#endif
+       return ocfs2_lock_type_strings[type];
+}
+
 #endif  /* OCFS2_LOCKID_H */
index d17e33e..4c29cd7 100644 (file)
@@ -202,7 +202,7 @@ static int ocfs2_init_global_system_inodes(struct ocfs2_super *osb)
 
        mlog_entry_void();
 
-       new = ocfs2_iget(osb, osb->root_blkno);
+       new = ocfs2_iget(osb, osb->root_blkno, OCFS2_FI_FLAG_SYSFILE);
        if (IS_ERR(new)) {
                status = PTR_ERR(new);
                mlog_errno(status);
@@ -210,7 +210,7 @@ static int ocfs2_init_global_system_inodes(struct ocfs2_super *osb)
        }
        osb->root_inode = new;
 
-       new = ocfs2_iget(osb, osb->system_dir_blkno);
+       new = ocfs2_iget(osb, osb->system_dir_blkno, OCFS2_FI_FLAG_SYSFILE);
        if (IS_ERR(new)) {
                status = PTR_ERR(new);
                mlog_errno(status);
@@ -682,7 +682,7 @@ static struct file_system_type ocfs2_fs_type = {
        .kill_sb        = kill_block_super, /* set to the generic one
                                             * right now, but do we
                                             * need to change that? */
-       .fs_flags       = FS_REQUIRES_DEV,
+       .fs_flags       = FS_REQUIRES_DEV|FS_RENAME_DOES_D_MOVE,
        .next           = NULL
 };
 
index fc29cb7..5df6e35 100644 (file)
 #include <linux/slab.h>
 #include <linux/highmem.h>
 
-#include "ocfs2.h"
-
 #define MLOG_MASK_PREFIX ML_INODE
 #include <cluster/masklog.h>
 
+#include "ocfs2.h"
+
 #include "alloc.h"
 #include "dir.h"
 #include "inode.h"
@@ -115,7 +115,7 @@ static struct inode * _ocfs2_get_system_file_inode(struct ocfs2_super *osb,
                goto bail;
        }
 
-       inode = ocfs2_iget(osb, blkno);
+       inode = ocfs2_iget(osb, blkno, OCFS2_FI_FLAG_SYSFILE);
        if (IS_ERR(inode)) {
                mlog_errno(PTR_ERR(inode));
                inode = NULL;
index cf70fe2..5b4dca7 100644 (file)
@@ -74,9 +74,6 @@ struct ocfs2_vote_msg
                __be32 v_orphaned_slot; /* Used during delete votes */
                __be32 v_nlink;         /* Used during unlink votes */
        } md1;                          /* Message type dependant 1 */
-       __be32 v_unlink_namelen;
-       __be64 v_unlink_parent;
-       u8  v_unlink_dirent[OCFS2_VOTE_FILENAME_LEN];
 };
 
 /* Responses are given these values to maintain backwards
@@ -100,8 +97,6 @@ struct ocfs2_vote_work {
 enum ocfs2_vote_request {
        OCFS2_VOTE_REQ_INVALID = 0,
        OCFS2_VOTE_REQ_DELETE,
-       OCFS2_VOTE_REQ_UNLINK,
-       OCFS2_VOTE_REQ_RENAME,
        OCFS2_VOTE_REQ_MOUNT,
        OCFS2_VOTE_REQ_UMOUNT,
        OCFS2_VOTE_REQ_LAST
@@ -261,103 +256,13 @@ done:
        return response;
 }
 
-static int ocfs2_match_dentry(struct dentry *dentry,
-                             u64 parent_blkno,
-                             unsigned int namelen,
-                             const char *name)
-{
-       struct inode *parent;
-
-       if (!dentry->d_parent) {
-               mlog(0, "Detached from parent.\n");
-               return 0;
-       }
-
-       parent = dentry->d_parent->d_inode;
-       /* Negative parent dentry? */
-       if (!parent)
-               return 0;
-
-       /* Name is in a different directory. */
-       if (OCFS2_I(parent)->ip_blkno != parent_blkno)
-               return 0;
-
-       if (dentry->d_name.len != namelen)
-               return 0;
-
-       /* comparison above guarantees this is safe. */
-       if (memcmp(dentry->d_name.name, name, namelen))
-               return 0;
-
-       return 1;
-}
-
-static void ocfs2_process_dentry_request(struct inode *inode,
-                                        int rename,
-                                        unsigned int new_nlink,
-                                        u64 parent_blkno,
-                                        unsigned int namelen,
-                                        const char *name)
-{
-       struct dentry *dentry = NULL;
-       struct list_head *p;
-       struct ocfs2_inode_info *oi = OCFS2_I(inode);
-
-       mlog(0, "parent %llu, namelen = %u, name = %.*s\n",
-            (unsigned long long)parent_blkno, namelen, namelen, name);
-
-       spin_lock(&dcache_lock);
-
-       /* Another node is removing this name from the system. It is
-        * up to us to find the corresponding dentry and if it exists,
-        * unhash it from the dcache. */
-       list_for_each(p, &inode->i_dentry) {
-               dentry = list_entry(p, struct dentry, d_alias);
-
-               if (ocfs2_match_dentry(dentry, parent_blkno, namelen, name)) {
-                       mlog(0, "dentry found: %.*s\n",
-                            dentry->d_name.len, dentry->d_name.name);
-
-                       dget_locked(dentry);
-                       break;
-               }
-
-               dentry = NULL;
-       }
-
-       spin_unlock(&dcache_lock);
-
-       if (dentry) {
-               d_delete(dentry);
-               dput(dentry);
-       }
-
-       /* rename votes don't send link counts */
-       if (!rename) {
-               mlog(0, "new_nlink = %u\n", new_nlink);
-
-               /* We don't have the proper locks here to directly
-                * change i_nlink and besides, the vote is sent
-                * *before* the operation so it may have failed on the
-                * other node. This passes a hint to ocfs2_drop_inode
-                * to force ocfs2_delete_inode, who will take the
-                * proper cluster locks to sort things out. */
-               if (new_nlink == 0) {
-                       spin_lock(&oi->ip_lock);
-                       oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
-                       spin_unlock(&OCFS2_I(inode)->ip_lock);
-               }
-       }
-}
-
 static void ocfs2_process_vote(struct ocfs2_super *osb,
                               struct ocfs2_vote_msg *msg)
 {
        int net_status, vote_response;
        int orphaned_slot = 0;
-       int rename = 0;
-       unsigned int node_num, generation, new_nlink, namelen;
-       u64 blkno, parent_blkno;
+       unsigned int node_num, generation;
+       u64 blkno;
        enum ocfs2_vote_request request;
        struct inode *inode = NULL;
        struct ocfs2_msg_hdr *hdr = &msg->v_hdr;
@@ -437,18 +342,6 @@ static void ocfs2_process_vote(struct ocfs2_super *osb,
                vote_response = ocfs2_process_delete_request(inode,
                                                             &orphaned_slot);
                break;
-       case OCFS2_VOTE_REQ_RENAME:
-               rename = 1;
-               /* fall through */
-       case OCFS2_VOTE_REQ_UNLINK:
-               parent_blkno = be64_to_cpu(msg->v_unlink_parent);
-               namelen = be32_to_cpu(msg->v_unlink_namelen);
-               /* new_nlink will be ignored in case of a rename vote */
-               new_nlink = be32_to_cpu(msg->md1.v_nlink);
-               ocfs2_process_dentry_request(inode, rename, new_nlink,
-                                            parent_blkno, namelen,
-                                            msg->v_unlink_dirent);
-               break;
        default:
                mlog(ML_ERROR, "node %u, invalid request: %u\n",
                     node_num, request);
@@ -889,75 +782,6 @@ int ocfs2_request_delete_vote(struct inode *inode)
        return status;
 }
 
-static void ocfs2_setup_unlink_vote(struct ocfs2_vote_msg *request,
-                                   struct dentry *dentry)
-{
-       struct inode *parent = dentry->d_parent->d_inode;
-
-       /* We need some values which will uniquely identify a dentry
-        * on the other nodes so that they can find it and run
-        * d_delete against it. Parent directory block and full name
-        * should suffice. */
-
-       mlog(0, "unlink/rename request: parent: %llu name: %.*s\n",
-            (unsigned long long)OCFS2_I(parent)->ip_blkno, dentry->d_name.len,
-            dentry->d_name.name);
-
-       request->v_unlink_parent = cpu_to_be64(OCFS2_I(parent)->ip_blkno);
-       request->v_unlink_namelen = cpu_to_be32(dentry->d_name.len);
-       memcpy(request->v_unlink_dirent, dentry->d_name.name,
-              dentry->d_name.len);
-}
-
-int ocfs2_request_unlink_vote(struct inode *inode,
-                             struct dentry *dentry,
-                             unsigned int nlink)
-{
-       int status;
-       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
-       struct ocfs2_vote_msg *request;
-
-       if (dentry->d_name.len > OCFS2_VOTE_FILENAME_LEN)
-               return -ENAMETOOLONG;
-
-       status = -ENOMEM;
-       request = ocfs2_new_vote_request(osb, OCFS2_I(inode)->ip_blkno,
-                                        inode->i_generation,
-                                        OCFS2_VOTE_REQ_UNLINK, nlink);
-       if (request) {
-               ocfs2_setup_unlink_vote(request, dentry);
-
-               status = ocfs2_request_vote(inode, request, NULL);
-
-               kfree(request);
-       }
-       return status;
-}
-
-int ocfs2_request_rename_vote(struct inode *inode,
-                             struct dentry *dentry)
-{
-       int status;
-       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
-       struct ocfs2_vote_msg *request;
-
-       if (dentry->d_name.len > OCFS2_VOTE_FILENAME_LEN)
-               return -ENAMETOOLONG;
-
-       status = -ENOMEM;
-       request = ocfs2_new_vote_request(osb, OCFS2_I(inode)->ip_blkno,
-                                        inode->i_generation,
-                                        OCFS2_VOTE_REQ_RENAME, 0);
-       if (request) {
-               ocfs2_setup_unlink_vote(request, dentry);
-
-               status = ocfs2_request_vote(inode, request, NULL);
-
-               kfree(request);
-       }
-       return status;
-}
-
 int ocfs2_request_mount_vote(struct ocfs2_super *osb)
 {
        int status;
index 9cce607..53ebc1c 100644 (file)
@@ -39,11 +39,6 @@ static inline void ocfs2_kick_vote_thread(struct ocfs2_super *osb)
 }
 
 int ocfs2_request_delete_vote(struct inode *inode);
-int ocfs2_request_unlink_vote(struct inode *inode,
-                             struct dentry *dentry,
-                             unsigned int nlink);
-int ocfs2_request_rename_vote(struct inode *inode,
-                             struct dentry *dentry);
 int ocfs2_request_mount_vote(struct ocfs2_super *osb);
 int ocfs2_request_umount_vote(struct ocfs2_super *osb);
 int ocfs2_register_net_handlers(struct ocfs2_super *osb);
index 555bc19..1d3e601 100644 (file)
@@ -92,9 +92,10 @@ extern int dir_notify_enable;
 #define FS_REQUIRES_DEV 1 
 #define FS_BINARY_MOUNTDATA 2
 #define FS_REVAL_DOT   16384   /* Check the paths ".", ".." for staleness */
-#define FS_ODD_RENAME  32768   /* Temporary stuff; will go away as soon
-                                 * as nfs_rename() will be cleaned up
-                                 */
+#define FS_RENAME_DOES_D_MOVE  32768   /* FS will handle d_move()
+                                        * during rename() internally.
+                                        */
+
 /*
  * These are the fs-independent mount-flags: up to 32 flags are supported
  */