basic modification from way back
[powerpc.git] / fs / ocfs2 / dlm / dlmrecovery.c
index 367a11e..671c4ed 100644 (file)
@@ -163,9 +163,6 @@ void dlm_dispatch_work(struct work_struct *work)
        dlm_workfunc_t *workfunc;
        int tot=0;
 
-       if (!dlm_joined(dlm))
-               return;
-
        spin_lock(&dlm->work_lock);
        list_splice_init(&dlm->work_list, &tmp_list);
        spin_unlock(&dlm->work_lock);
@@ -614,6 +611,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
                        }
                } while (status != 0);
 
+               spin_lock(&dlm_reco_state_lock);
                switch (ndata->state) {
                        case DLM_RECO_NODE_DATA_INIT:
                        case DLM_RECO_NODE_DATA_FINALIZE_SENT:
@@ -644,6 +642,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
                                     ndata->node_num, dead_node);
                                break;
                }
+               spin_unlock(&dlm_reco_state_lock);
        }
 
        mlog(0, "done requesting all lock info\n");
@@ -821,7 +820,8 @@ static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,
 
 }
 
-int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,
+                                 void **ret_data)
 {
        struct dlm_ctxt *dlm = data;
        struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf;
@@ -978,7 +978,8 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
 }
 
 
-int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,
+                              void **ret_data)
 {
        struct dlm_ctxt *dlm = data;
        struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf;
@@ -1129,6 +1130,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
        if (total_locks == mres_total_locks)
                mres->flags |= DLM_MRES_ALL_DONE;
 
+       mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
+            dlm->name, res->lockname.len, res->lockname.name,
+            orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery",
+            send_to);
+
        /* send it */
        ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
                                 sz, send_to, &status);
@@ -1213,6 +1219,34 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
        return 0;
 }
 
+static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
+                              struct dlm_migratable_lockres *mres)
+{
+       struct dlm_lock dummy;
+       memset(&dummy, 0, sizeof(dummy));
+       dummy.ml.cookie = 0;
+       dummy.ml.type = LKM_IVMODE;
+       dummy.ml.convert_type = LKM_IVMODE;
+       dummy.ml.highest_blocked = LKM_IVMODE;
+       dummy.lksb = NULL;
+       dummy.ml.node = dlm->node_num;
+       dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
+}
+
+static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
+                                   struct dlm_migratable_lock *ml,
+                                   u8 *nodenum)
+{
+       if (unlikely(ml->cookie == 0 &&
+           ml->type == LKM_IVMODE &&
+           ml->convert_type == LKM_IVMODE &&
+           ml->highest_blocked == LKM_IVMODE &&
+           ml->list == DLM_BLOCKED_LIST)) {
+               *nodenum = ml->node;
+               return 1;
+       }
+       return 0;
+}
 
 int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
                         struct dlm_migratable_lockres *mres,
@@ -1260,6 +1294,14 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
                                goto error;
                }
        }
+       if (total_locks == 0) {
+               /* send a dummy lock to indicate a mastery reference only */
+               mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
+                    dlm->name, res->lockname.len, res->lockname.name,
+                    send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
+                    "migration");
+               dlm_add_dummy_lock(dlm, mres);
+       }
        /* flush any remaining locks */
        ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
        if (ret < 0)
@@ -1293,7 +1335,8 @@ error:
  * do we spin?  returning an error only delays the problem really
  */
 
-int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
+                           void **ret_data)
 {
        struct dlm_ctxt *dlm = data;
        struct dlm_migratable_lockres *mres =
@@ -1382,17 +1425,21 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
                spin_lock(&res->spinlock);
                res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
                spin_unlock(&res->spinlock);
+               wake_up(&res->wq);
 
                /* add an extra ref for just-allocated lockres 
                 * otherwise the lockres will be purged immediately */
                dlm_lockres_get(res);
-
        }
 
        /* at this point we have allocated everything we need,
         * and we have a hashed lockres with an extra ref and
         * the proper res->state flags. */
        ret = 0;
+       spin_lock(&res->spinlock);
+       /* drop this either when master requery finds a different master
+        * or when a lock is added by the recovery worker */
+       dlm_lockres_grab_inflight_ref(dlm, res);
        if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
                /* migration cannot have an unknown master */
                BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
@@ -1400,10 +1447,11 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
                          "unknown owner.. will need to requery: "
                          "%.*s\n", mres->lockname_len, mres->lockname);
        } else {
-               spin_lock(&res->spinlock);
+               /* take a reference now to pin the lockres, drop it
+                * when locks are added in the worker */
                dlm_change_lockres_owner(dlm, res, dlm->node_num);
-               spin_unlock(&res->spinlock);
        }
+       spin_unlock(&res->spinlock);
 
        /* queue up work for dlm_mig_lockres_worker */
        dlm_grab(dlm);  /* get an extra ref for the work item */
@@ -1459,6 +1507,9 @@ again:
                                   "this node will take it.\n",
                                   res->lockname.len, res->lockname.name);
                } else {
+                       spin_lock(&res->spinlock);
+                       dlm_lockres_drop_inflight_ref(dlm, res);
+                       spin_unlock(&res->spinlock);
                        mlog(0, "master needs to respond to sender "
                                  "that node %u still owns %.*s\n",
                                  real_master, res->lockname.len,
@@ -1578,7 +1629,8 @@ int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
 /* this function cannot error, so unless the sending
  * or receiving of the message failed, the owner can
  * be trusted */
-int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
+                              void **ret_data)
 {
        struct dlm_ctxt *dlm = data;
        struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
@@ -1660,21 +1712,38 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
 {
        struct dlm_migratable_lock *ml;
        struct list_head *queue;
+       struct list_head *tmpq = NULL;
        struct dlm_lock *newlock = NULL;
        struct dlm_lockstatus *lksb = NULL;
        int ret = 0;
-       int i, bad;
+       int i, j, bad;
        struct list_head *iter;
        struct dlm_lock *lock = NULL;
+       u8 from = O2NM_MAX_NODES;
+       unsigned int added = 0;
 
        mlog(0, "running %d locks for this lockres\n", mres->num_locks);
        for (i=0; i<mres->num_locks; i++) {
                ml = &(mres->ml[i]);
+
+               if (dlm_is_dummy_lock(dlm, ml, &from)) {
+                       /* placeholder, just need to set the refmap bit */
+                       BUG_ON(mres->num_locks != 1);
+                       mlog(0, "%s:%.*s: dummy lock for %u\n",
+                            dlm->name, mres->lockname_len, mres->lockname,
+                            from);
+                       spin_lock(&res->spinlock);
+                       dlm_lockres_set_refmap_bit(from, res);
+                       spin_unlock(&res->spinlock);
+                       added++;
+                       break;
+               }
                BUG_ON(ml->highest_blocked != LKM_IVMODE);
                newlock = NULL;
                lksb = NULL;
 
                queue = dlm_list_num_to_pointer(res, ml->list);
+               tmpq = NULL;
 
                /* if the lock is for the local node it needs to
                 * be moved to the proper location within the queue.
@@ -1684,26 +1753,39 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                        BUG_ON(!(mres->flags & DLM_MRES_MIGRATION));
 
                        spin_lock(&res->spinlock);
-                       list_for_each(iter, queue) {
-                               lock = list_entry (iter, struct dlm_lock, list);
-                               if (lock->ml.cookie != ml->cookie)
-                                       lock = NULL;
-                               else
+                       for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) {
+                               tmpq = dlm_list_idx_to_ptr(res, j);
+                               list_for_each(iter, tmpq) {
+                                       lock = list_entry (iter, struct dlm_lock, list);
+                                       if (lock->ml.cookie != ml->cookie)
+                                               lock = NULL;
+                                       else
+                                               break;
+                               }
+                               if (lock)
                                        break;
                        }
 
                        /* lock is always created locally first, and
                         * destroyed locally last.  it must be on the list */
                        if (!lock) {
-                               u64 c = ml->cookie;
+                               __be64 c = ml->cookie;
                                mlog(ML_ERROR, "could not find local lock "
                                               "with cookie %u:%llu!\n",
-                                              dlm_get_lock_cookie_node(c),
-                                              dlm_get_lock_cookie_seq(c));
+                                    dlm_get_lock_cookie_node(be64_to_cpu(c)),
+                                    dlm_get_lock_cookie_seq(be64_to_cpu(c)));
+                               __dlm_print_one_lock_resource(res);
                                BUG();
                        }
                        BUG_ON(lock->ml.node != ml->node);
 
+                       if (tmpq != queue) {
+                               mlog(0, "lock was on %u instead of %u for %.*s\n",
+                                    j, ml->list, res->lockname.len, res->lockname.name);
+                               spin_unlock(&res->spinlock);
+                               continue;
+                       }
+
                        /* see NOTE above about why we do not update
                         * to match the master here */
 
@@ -1711,6 +1793,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                        /* do not alter lock refcount.  switching lists. */
                        list_move_tail(&lock->list, queue);
                        spin_unlock(&res->spinlock);
+                       added++;
 
                        mlog(0, "just reordered a local lock!\n");
                        continue;
@@ -1795,18 +1878,18 @@ skip_lvb:
                spin_lock(&res->spinlock);
                list_for_each_entry(lock, queue, list) {
                        if (lock->ml.cookie == ml->cookie) {
-                               u64 c = lock->ml.cookie;
+                               __be64 c = lock->ml.cookie;
                                mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
                                     "exists on this lockres!\n", dlm->name,
                                     res->lockname.len, res->lockname.name,
-                                    dlm_get_lock_cookie_node(c),
-                                    dlm_get_lock_cookie_seq(c));
+                                    dlm_get_lock_cookie_node(be64_to_cpu(c)),
+                                    dlm_get_lock_cookie_seq(be64_to_cpu(c)));
 
                                mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
                                     "node=%u, cookie=%u:%llu, queue=%d\n",
                                     ml->type, ml->convert_type, ml->node,
-                                    dlm_get_lock_cookie_node(ml->cookie),
-                                    dlm_get_lock_cookie_seq(ml->cookie),
+                                    dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)),
+                                    dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)),
                                     ml->list);
 
                                __dlm_print_one_lock_resource(res);
@@ -1817,12 +1900,22 @@ skip_lvb:
                if (!bad) {
                        dlm_lock_get(newlock);
                        list_add_tail(&newlock->list, queue);
+                       mlog(0, "%s:%.*s: added lock for node %u, "
+                            "setting refmap bit\n", dlm->name,
+                            res->lockname.len, res->lockname.name, ml->node);
+                       dlm_lockres_set_refmap_bit(ml->node, res);
+                       added++;
                }
                spin_unlock(&res->spinlock);
        }
        mlog(0, "done running all the locks\n");
 
 leave:
+       /* balance the ref taken when the work was queued */
+       spin_lock(&res->spinlock);
+       dlm_lockres_drop_inflight_ref(dlm, res);
+       spin_unlock(&res->spinlock);
+
        if (ret < 0) {
                mlog_errno(ret);
                if (newlock)
@@ -1935,9 +2028,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
                if (res->owner == dead_node) {
                        list_del_init(&res->recovering);
                        spin_lock(&res->spinlock);
+                       /* new_master has our reference from
+                        * the lock state sent during recovery */
                        dlm_change_lockres_owner(dlm, res, new_master);
                        res->state &= ~DLM_LOCK_RES_RECOVERING;
-                       if (!__dlm_lockres_unused(res))
+                       if (__dlm_lockres_has_locks(res))
                                __dlm_dirty_lockres(dlm, res);
                        spin_unlock(&res->spinlock);
                        wake_up(&res->wq);
@@ -1977,9 +2072,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
                                        dlm_lockres_put(res);
                                }
                                spin_lock(&res->spinlock);
+                               /* new_master has our reference from
+                                * the lock state sent during recovery */
                                dlm_change_lockres_owner(dlm, res, new_master);
                                res->state &= ~DLM_LOCK_RES_RECOVERING;
-                               if (!__dlm_lockres_unused(res))
+                               if (__dlm_lockres_has_locks(res))
                                        __dlm_dirty_lockres(dlm, res);
                                spin_unlock(&res->spinlock);
                                wake_up(&res->wq);
@@ -2048,6 +2145,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
 {
        struct list_head *iter, *tmpiter;
        struct dlm_lock *lock;
+       unsigned int freed = 0;
 
        /* this node is the lockres master:
         * 1) remove any stale locks for the dead node
@@ -2062,6 +2160,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
                if (lock->ml.node == dead_node) {
                        list_del_init(&lock->list);
                        dlm_lock_put(lock);
+                       freed++;
                }
        }
        list_for_each_safe(iter, tmpiter, &res->converting) {
@@ -2069,6 +2168,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
                if (lock->ml.node == dead_node) {
                        list_del_init(&lock->list);
                        dlm_lock_put(lock);
+                       freed++;
                }
        }
        list_for_each_safe(iter, tmpiter, &res->blocked) {
@@ -2076,9 +2176,23 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
                if (lock->ml.node == dead_node) {
                        list_del_init(&lock->list);
                        dlm_lock_put(lock);
+                       freed++;
                }
        }
 
+       if (freed) {
+               mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
+                    "dropping ref from lockres\n", dlm->name,
+                    res->lockname.len, res->lockname.name, freed, dead_node);
+               BUG_ON(!test_bit(dead_node, res->refmap));
+               dlm_lockres_clear_refmap_bit(dead_node, res);
+       } else if (test_bit(dead_node, res->refmap)) {
+               mlog(0, "%s:%.*s: dead node %u had a ref, but had "
+                    "no locks and had not purged before dying\n", dlm->name,
+                    res->lockname.len, res->lockname.name, dead_node);
+               dlm_lockres_clear_refmap_bit(dead_node, res);
+       }
+
        /* do not kick thread yet */
        __dlm_dirty_lockres(dlm, res);
 }
@@ -2141,9 +2255,21 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
                        spin_lock(&res->spinlock);
                        /* zero the lvb if necessary */
                        dlm_revalidate_lvb(dlm, res, dead_node);
-                       if (res->owner == dead_node)
+                       if (res->owner == dead_node) {
+                               if (res->state & DLM_LOCK_RES_DROPPING_REF)
+                                       mlog(0, "%s:%.*s: owned by "
+                                            "dead node %u, this node was "
+                                            "dropping its ref when it died. "
+                                            "continue, dropping the flag.\n",
+                                            dlm->name, res->lockname.len,
+                                            res->lockname.name, dead_node);
+
+                               /* the wake_up for this will happen when the
+                                * RECOVERING flag is dropped later */
+                               res->state &= ~DLM_LOCK_RES_DROPPING_REF;
+
                                dlm_move_lockres_to_recovery_list(dlm, res);
-                       else if (res->owner == dlm->node_num) {
+                       else if (res->owner == dlm->node_num) {
                                dlm_free_dead_locks(dlm, res, dead_node);
                                __dlm_lockres_calc_usage(dlm, res);
                        }
@@ -2480,7 +2606,8 @@ retry:
        return ret;
 }
 
-int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
+                          void **ret_data)
 {
        struct dlm_ctxt *dlm = data;
        struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf;
@@ -2608,7 +2735,8 @@ stage2:
        return ret;
 }
 
-int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
+                             void **ret_data)
 {
        struct dlm_ctxt *dlm = data;
        struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;