X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=fs%2Fdlm%2Frcom.c;h=6bfbd61538094f72cb1001bc782cc246faf55552;hb=b563d6f30d937510e02541930b1558d0f5759413;hp=55fbe313340e59a212788ba5cefedd92ded4a6c6;hpb=c6a756795d5ba0637aae8da89dd11bb7e3a1ee74;p=powerpc.git diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index 55fbe31334..6bfbd61538 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -56,6 +56,10 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, rc->rc_type = type; + spin_lock(&ls->ls_recover_lock); + rc->rc_seq = ls->ls_recover_seq; + spin_unlock(&ls->ls_recover_lock); + *mh_ret = mh; *rc_ret = rc; return 0; @@ -78,8 +82,17 @@ static void make_config(struct dlm_ls *ls, struct rcom_config *rf) rf->rf_lsflags = ls->ls_exflags; } -static int check_config(struct dlm_ls *ls, struct rcom_config *rf, int nodeid) +static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) { + struct rcom_config *rf = (struct rcom_config *) rc->rc_buf; + + if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) { + log_error(ls, "version mismatch: %x nodeid %d: %x", + DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid, + rc->rc_header.h_version); + return -EINVAL; + } + if (rf->rf_lvblen != ls->ls_lvblen || rf->rf_lsflags != ls->ls_exflags) { log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x", @@ -90,13 +103,29 @@ static int check_config(struct dlm_ls *ls, struct rcom_config *rf, int nodeid) return 0; } +static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq) +{ + spin_lock(&ls->ls_rcom_spin); + *new_seq = ++ls->ls_rcom_seq; + set_bit(LSFL_RCOM_WAIT, &ls->ls_flags); + spin_unlock(&ls->ls_rcom_spin); +} + +static void disallow_sync_reply(struct dlm_ls *ls) +{ + spin_lock(&ls->ls_rcom_spin); + clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags); + clear_bit(LSFL_RCOM_READY, &ls->ls_flags); + spin_unlock(&ls->ls_rcom_spin); +} + int dlm_rcom_status(struct dlm_ls *ls, int nodeid) { struct dlm_rcom *rc; struct dlm_mhandle *mh; int error = 0; - memset(ls->ls_recover_buf, 0, dlm_config.buffer_size); + ls->ls_recover_nodeid = nodeid; if (nodeid == dlm_our_nodeid()) { rc = (struct dlm_rcom *) ls->ls_recover_buf; @@ -108,10 +137,13 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid) if (error) goto out; + allow_sync_reply(ls, &rc->rc_id); + memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size); + send_rcom(ls, mh, rc); error = dlm_wait_function(ls, &rcom_response); - clear_bit(LSFL_RCOM_READY, &ls->ls_flags); + disallow_sync_reply(ls); if (error) goto out; @@ -122,8 +154,7 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid) log_debug(ls, "remote node %d not ready", nodeid); rc->rc_result = 0; } else - error = check_config(ls, (struct rcom_config *) rc->rc_buf, - nodeid); + error = check_config(ls, rc, nodeid); /* the caller looks at rc_result for the remote recovery status */ out: return error; @@ -139,17 +170,36 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in) sizeof(struct rcom_config), &rc, &mh); if (error) return; + rc->rc_id = rc_in->rc_id; + rc->rc_seq_reply = rc_in->rc_seq; rc->rc_result = dlm_recover_status(ls); make_config(ls, (struct rcom_config *) rc->rc_buf); send_rcom(ls, mh, rc); } -static void receive_rcom_status_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) +static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) { + spin_lock(&ls->ls_rcom_spin); + if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) || + rc_in->rc_id != ls->ls_rcom_seq) { + log_debug(ls, "reject reply %d from %d seq %llx expect %llx", + rc_in->rc_type, rc_in->rc_header.h_nodeid, + (unsigned long long)rc_in->rc_id, + (unsigned long long)ls->ls_rcom_seq); + goto out; + } memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length); set_bit(LSFL_RCOM_READY, &ls->ls_flags); + clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags); wake_up(&ls->ls_wait_general); + out: + spin_unlock(&ls->ls_rcom_spin); +} + +static void receive_rcom_status_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) +{ + receive_sync_reply(ls, rc_in); } int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) @@ -158,12 +208,12 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) struct dlm_mhandle *mh; int error = 0, len = sizeof(struct dlm_rcom); - memset(ls->ls_recover_buf, 0, dlm_config.buffer_size); + ls->ls_recover_nodeid = nodeid; if (nodeid == dlm_our_nodeid()) { dlm_copy_master_names(ls, last_name, last_len, ls->ls_recover_buf + len, - dlm_config.buffer_size - len, nodeid); + dlm_config.ci_buffer_size - len, nodeid); goto out; } @@ -172,10 +222,13 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) goto out; memcpy(rc->rc_buf, last_name, last_len); + allow_sync_reply(ls, &rc->rc_id); + memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size); + send_rcom(ls, mh, rc); error = dlm_wait_function(ls, &rcom_response); - clear_bit(LSFL_RCOM_READY, &ls->ls_flags); + disallow_sync_reply(ls); out: return error; } @@ -184,29 +237,17 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in) { struct dlm_rcom *rc; struct dlm_mhandle *mh; - int error, inlen, outlen; - int nodeid = rc_in->rc_header.h_nodeid; - uint32_t status = dlm_recover_status(ls); - - /* - * We can't run dlm_dir_rebuild_send (which uses ls_nodes) while - * dlm_recoverd is running ls_nodes_reconfig (which changes ls_nodes). - * It could only happen in rare cases where we get a late NAMES - * message from a previous instance of recovery. - */ - - if (!(status & DLM_RS_NODES)) { - log_debug(ls, "ignoring RCOM_NAMES from %u", nodeid); - return; - } + int error, inlen, outlen, nodeid; nodeid = rc_in->rc_header.h_nodeid; inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom); - outlen = dlm_config.buffer_size - sizeof(struct dlm_rcom); + outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom); error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh); if (error) return; + rc->rc_id = rc_in->rc_id; + rc->rc_seq_reply = rc_in->rc_seq; dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen, nodeid); @@ -215,9 +256,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in) static void receive_rcom_names_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) { - memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length); - set_bit(LSFL_RCOM_READY, &ls->ls_flags); - wake_up(&ls->ls_wait_general); + receive_sync_reply(ls, rc_in); } int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) @@ -255,6 +294,7 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) ret_nodeid = error; rc->rc_result = ret_nodeid; rc->rc_id = rc_in->rc_id; + rc->rc_seq_reply = rc_in->rc_seq; send_rcom(ls, mh, rc); } @@ -336,29 +376,23 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in) memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock)); rc->rc_id = rc_in->rc_id; + rc->rc_seq_reply = rc_in->rc_seq; send_rcom(ls, mh, rc); } static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) { - uint32_t status = dlm_recover_status(ls); - - if (!(status & DLM_RS_DIR)) { - log_debug(ls, "ignoring RCOM_LOCK_REPLY from %u", - rc_in->rc_header.h_nodeid); - return; - } - dlm_recover_process_copy(ls, rc_in); } static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) { struct dlm_rcom *rc; + struct rcom_config *rf; struct dlm_mhandle *mh; char *mb; - int mb_len = sizeof(struct dlm_rcom); + int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config); mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_KERNEL, &mb); if (!mh) @@ -374,14 +408,44 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) rc->rc_header.h_cmd = DLM_RCOM; rc->rc_type = DLM_RCOM_STATUS_REPLY; + rc->rc_id = rc_in->rc_id; + rc->rc_seq_reply = rc_in->rc_seq; rc->rc_result = -ESRCH; + rf = (struct rcom_config *) rc->rc_buf; + rf->rf_lvblen = -1; + dlm_rcom_out(rc); dlm_lowcomms_commit_buffer(mh); return 0; } +static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc) +{ + uint64_t seq; + int rv = 0; + + switch (rc->rc_type) { + case DLM_RCOM_STATUS_REPLY: + case DLM_RCOM_NAMES_REPLY: + case DLM_RCOM_LOOKUP_REPLY: + case DLM_RCOM_LOCK_REPLY: + spin_lock(&ls->ls_recover_lock); + seq = ls->ls_recover_seq; + spin_unlock(&ls->ls_recover_lock); + if (rc->rc_seq_reply != seq) { + log_debug(ls, "ignoring old reply %x from %d " + "seq_reply %llx expect %llx", + rc->rc_type, rc->rc_header.h_nodeid, + (unsigned long long)rc->rc_seq_reply, + (unsigned long long)seq); + rv = 1; + } + } + return rv; +} + /* Called by dlm_recvd; corresponds to dlm_receive_message() but special recovery-only comms are sent through here. */ @@ -397,18 +461,22 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid) ls = dlm_find_lockspace_global(hd->h_lockspace); if (!ls) { - log_print("lockspace %x from %d not found", - hd->h_lockspace, nodeid); - send_ls_not_ready(nodeid, rc); + log_print("lockspace %x from %d type %x not found", + hd->h_lockspace, nodeid, rc->rc_type); + if (rc->rc_type == DLM_RCOM_STATUS) + send_ls_not_ready(nodeid, rc); return; } if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) { - log_error(ls, "ignoring recovery message %x from %d", + log_debug(ls, "ignoring recovery message %x from %d", rc->rc_type, nodeid); goto out; } + if (is_old_reply(ls, rc)) + goto out; + if (nodeid != rc->rc_header.h_nodeid) { log_error(ls, "bad rcom nodeid %d from %d", rc->rc_header.h_nodeid, nodeid);