X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=net%2Fsunrpc%2Fsched.c;h=4c669121e607f774b7739a326639e8f2aec146a3;hb=ae9458d6a0956aa21cb49e1251e35a8d4dacbe6e;hp=2ac43c41c3a9889664a1c36b5072e1690ce0fea2;hpb=40b42f1ebf653cd72c32eb1a1a0b9fea2dfbfd7d;p=powerpc.git diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 2ac43c41c3..4c669121e6 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -45,13 +45,11 @@ static void rpc_release_task(struct rpc_task *task); /* * RPC tasks sit here while waiting for conditions to improve. */ -static RPC_WAITQ(delay_queue, "delayq"); +static struct rpc_wait_queue delay_queue; /* * rpciod-related stuff */ -static DEFINE_MUTEX(rpciod_mutex); -static atomic_t rpciod_users = ATOMIC_INIT(0); struct workqueue_struct *rpciod_workqueue; /* @@ -137,7 +135,7 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct r if (unlikely(task->tk_priority > queue->maxpriority)) q = &queue->tasks[queue->maxpriority]; list_for_each_entry(t, q, u.tk_wait.list) { - if (t->tk_cookie == task->tk_cookie) { + if (t->tk_owner == task->tk_owner) { list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links); return; } @@ -210,26 +208,26 @@ static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int queue->count = 1 << (priority * 2); } -static inline void rpc_set_waitqueue_cookie(struct rpc_wait_queue *queue, unsigned long cookie) +static inline void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid) { - queue->cookie = cookie; + queue->owner = pid; queue->nr = RPC_BATCH_COUNT; } static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue) { rpc_set_waitqueue_priority(queue, queue->maxpriority); - rpc_set_waitqueue_cookie(queue, 0); + rpc_set_waitqueue_owner(queue, 0); } -static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, int maxprio) +static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues) { int i; spin_lock_init(&queue->lock); for (i = 0; i < ARRAY_SIZE(queue->tasks); i++) INIT_LIST_HEAD(&queue->tasks[i]); - queue->maxpriority = maxprio; + queue->maxpriority = nr_queues - 1; rpc_reset_waitqueue_priority(queue); #ifdef RPC_DEBUG queue->name = qname; @@ -238,18 +236,18 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname) { - __rpc_init_priority_wait_queue(queue, qname, RPC_PRIORITY_HIGH); + __rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY); } void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname) { - __rpc_init_priority_wait_queue(queue, qname, 0); + __rpc_init_priority_wait_queue(queue, qname, 1); } -EXPORT_SYMBOL(rpc_init_wait_queue); +EXPORT_SYMBOL_GPL(rpc_init_wait_queue); -static int rpc_wait_bit_interruptible(void *word) +static int rpc_wait_bit_killable(void *word) { - if (signal_pending(current)) + if (fatal_signal_pending(current)) return -ERESTARTSYS; schedule(); return 0; @@ -301,11 +299,11 @@ static void rpc_mark_complete_task(struct rpc_task *task) int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *)) { if (action == NULL) - action = rpc_wait_bit_interruptible; + action = rpc_wait_bit_killable; return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, - action, TASK_INTERRUPTIBLE); + action, TASK_KILLABLE); } -EXPORT_SYMBOL(__rpc_wait_for_completion_task); +EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); /* * Make an RPC task runnable. @@ -375,6 +373,7 @@ void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, __rpc_sleep_on(q, task, action, timer); spin_unlock_bh(&q->lock); } +EXPORT_SYMBOL_GPL(rpc_sleep_on); /** * __rpc_do_wake_up_task - wake up a single rpc_task @@ -446,6 +445,7 @@ void rpc_wake_up_task(struct rpc_task *task) } rcu_read_unlock_bh(); } +EXPORT_SYMBOL_GPL(rpc_wake_up_task); /* * Wake up the next task on a priority queue. @@ -456,12 +456,12 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu struct rpc_task *task; /* - * Service a batch of tasks from a single cookie. + * Service a batch of tasks from a single owner. */ q = &queue->tasks[queue->priority]; if (!list_empty(q)) { task = list_entry(q->next, struct rpc_task, u.tk_wait.list); - if (queue->cookie == task->tk_cookie) { + if (queue->owner == task->tk_owner) { if (--queue->nr) goto out; list_move_tail(&task->u.tk_wait.list, q); @@ -470,7 +470,7 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu * Check if we need to switch queues. */ if (--queue->count) - goto new_cookie; + goto new_owner; } /* @@ -492,8 +492,8 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu new_queue: rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0])); -new_cookie: - rpc_set_waitqueue_cookie(queue, task->tk_cookie); +new_owner: + rpc_set_waitqueue_owner(queue, task->tk_owner); out: __rpc_wake_up_task(task); return task; @@ -521,6 +521,7 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue) return task; } +EXPORT_SYMBOL_GPL(rpc_wake_up_next); /** * rpc_wake_up - wake up all rpc_tasks @@ -546,6 +547,7 @@ void rpc_wake_up(struct rpc_wait_queue *queue) spin_unlock(&queue->lock); rcu_read_unlock_bh(); } +EXPORT_SYMBOL_GPL(rpc_wake_up); /** * rpc_wake_up_status - wake up all rpc_tasks and set their status value. @@ -574,6 +576,7 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) spin_unlock(&queue->lock); rcu_read_unlock_bh(); } +EXPORT_SYMBOL_GPL(rpc_wake_up_status); static void __rpc_atrun(struct rpc_task *task) { @@ -588,6 +591,7 @@ void rpc_delay(struct rpc_task *task, unsigned long delay) task->tk_timeout = delay; rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun); } +EXPORT_SYMBOL_GPL(rpc_delay); /* * Helper to call task->tk_ops->rpc_call_prepare @@ -616,7 +620,7 @@ void rpc_exit_task(struct rpc_task *task) } } } -EXPORT_SYMBOL(rpc_exit_task); +EXPORT_SYMBOL_GPL(rpc_exit_task); void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata) { @@ -692,10 +696,9 @@ static void __rpc_execute(struct rpc_task *task) /* sync task: sleep here */ dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid); - /* Note: Caller should be using rpc_clnt_sigmask() */ status = out_of_line_wait_on_bit(&task->tk_runstate, - RPC_TASK_QUEUED, rpc_wait_bit_interruptible, - TASK_INTERRUPTIBLE); + RPC_TASK_QUEUED, rpc_wait_bit_killable, + TASK_KILLABLE); if (status == -ERESTARTSYS) { /* * When a sync task receives a signal, it exits with @@ -779,6 +782,7 @@ void *rpc_malloc(struct rpc_task *task, size_t size) task->tk_pid, size, buf); return &buf->data; } +EXPORT_SYMBOL_GPL(rpc_malloc); /** * rpc_free - free buffer allocated via rpc_malloc @@ -804,50 +808,58 @@ void rpc_free(void *buffer) else kfree(buf); } +EXPORT_SYMBOL_GPL(rpc_free); /* * Creation and deletion of RPC task structures */ -void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) +static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data) { memset(task, 0, sizeof(*task)); - init_timer(&task->tk_timer); - task->tk_timer.data = (unsigned long) task; - task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer; + setup_timer(&task->tk_timer, (void (*)(unsigned long))rpc_run_timer, + (unsigned long)task); atomic_set(&task->tk_count, 1); - task->tk_client = clnt; - task->tk_flags = flags; - task->tk_ops = tk_ops; - if (tk_ops->rpc_call_prepare != NULL) - task->tk_action = rpc_prepare_task; - task->tk_calldata = calldata; + task->tk_flags = task_setup_data->flags; + task->tk_ops = task_setup_data->callback_ops; + task->tk_calldata = task_setup_data->callback_data; INIT_LIST_HEAD(&task->tk_task); /* Initialize retry counters */ task->tk_garb_retry = 2; task->tk_cred_retry = 2; - task->tk_priority = RPC_PRIORITY_NORMAL; - task->tk_cookie = (unsigned long)current; + task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW; + task->tk_owner = current->tgid; /* Initialize workqueue for async tasks */ task->tk_workqueue = rpciod_workqueue; - if (clnt) { - kref_get(&clnt->cl_kref); - if (clnt->cl_softrtry) + task->tk_client = task_setup_data->rpc_client; + if (task->tk_client != NULL) { + kref_get(&task->tk_client->cl_kref); + if (task->tk_client->cl_softrtry) task->tk_flags |= RPC_TASK_SOFT; - if (!clnt->cl_intr) - task->tk_flags |= RPC_TASK_NOINTR; } - BUG_ON(task->tk_ops == NULL); + if (task->tk_ops->rpc_call_prepare != NULL) + task->tk_action = rpc_prepare_task; + + if (task_setup_data->rpc_message != NULL) { + memcpy(&task->tk_msg, task_setup_data->rpc_message, sizeof(task->tk_msg)); + /* Bind the user cred */ + if (task->tk_msg.rpc_cred != NULL) + rpcauth_holdcred(task); + else + rpcauth_bindcred(task); + if (task->tk_action == NULL) + rpc_call_start(task); + } /* starting timestamp */ task->tk_start = jiffies; dprintk("RPC: new task initialized, procpid %u\n", - current->pid); + task_pid_nr(current)); } static struct rpc_task * @@ -866,18 +878,22 @@ static void rpc_free_task(struct rcu_head *rcu) /* * Create a new task for the specified client. */ -struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) +struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data) { - struct rpc_task *task; - - task = rpc_alloc_task(); - if (!task) - goto out; + struct rpc_task *task = setup_data->task; + unsigned short flags = 0; + + if (task == NULL) { + task = rpc_alloc_task(); + if (task == NULL) + goto out; + flags = RPC_TASK_DYNAMIC; + } - rpc_init_task(task, clnt, flags, tk_ops, calldata); + rpc_init_task(task, setup_data); + task->tk_flags |= flags; dprintk("RPC: allocated task %p\n", task); - task->tk_flags |= RPC_TASK_DYNAMIC; out: return task; } @@ -903,7 +919,7 @@ void rpc_put_task(struct rpc_task *task) call_rcu_bh(&task->u.tk_rcu, rpc_free_task); rpc_release_calldata(tk_ops, calldata); } -EXPORT_SYMBOL(rpc_put_task); +EXPORT_SYMBOL_GPL(rpc_put_task); static void rpc_release_task(struct rpc_task *task) { @@ -960,61 +976,51 @@ void rpc_killall_tasks(struct rpc_clnt *clnt) } spin_unlock(&clnt->cl_lock); } +EXPORT_SYMBOL_GPL(rpc_killall_tasks); + +int rpciod_up(void) +{ + return try_module_get(THIS_MODULE) ? 0 : -EINVAL; +} + +void rpciod_down(void) +{ + module_put(THIS_MODULE); +} /* - * Start up the rpciod process if it's not already running. + * Start up the rpciod workqueue. */ -int -rpciod_up(void) +static int rpciod_start(void) { struct workqueue_struct *wq; - int error = 0; - - if (atomic_inc_not_zero(&rpciod_users)) - return 0; - - mutex_lock(&rpciod_mutex); - /* Guard against races with rpciod_down() */ - if (rpciod_workqueue != NULL) - goto out_ok; /* * Create the rpciod thread and wait for it to start. */ dprintk("RPC: creating workqueue rpciod\n"); - error = -ENOMEM; wq = create_workqueue("rpciod"); - if (wq == NULL) - goto out; - rpciod_workqueue = wq; - error = 0; -out_ok: - atomic_inc(&rpciod_users); -out: - mutex_unlock(&rpciod_mutex); - return error; + return rpciod_workqueue != NULL; } -void -rpciod_down(void) +static void rpciod_stop(void) { - if (!atomic_dec_and_test(&rpciod_users)) - return; + struct workqueue_struct *wq = NULL; - mutex_lock(&rpciod_mutex); + if (rpciod_workqueue == NULL) + return; dprintk("RPC: destroying workqueue rpciod\n"); - if (atomic_read(&rpciod_users) == 0 && rpciod_workqueue != NULL) { - destroy_workqueue(rpciod_workqueue); - rpciod_workqueue = NULL; - } - mutex_unlock(&rpciod_mutex); + wq = rpciod_workqueue; + rpciod_workqueue = NULL; + destroy_workqueue(wq); } void rpc_destroy_mempool(void) { + rpciod_stop(); if (rpc_buffer_mempool) mempool_destroy(rpc_buffer_mempool); if (rpc_task_mempool) @@ -1031,13 +1037,13 @@ rpc_init_mempool(void) rpc_task_slabp = kmem_cache_create("rpc_tasks", sizeof(struct rpc_task), 0, SLAB_HWCACHE_ALIGN, - NULL, NULL); + NULL); if (!rpc_task_slabp) goto err_nomem; rpc_buffer_slabp = kmem_cache_create("rpc_buffers", RPC_BUFFER_MAXSIZE, 0, SLAB_HWCACHE_ALIGN, - NULL, NULL); + NULL); if (!rpc_buffer_slabp) goto err_nomem; rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE, @@ -1048,6 +1054,13 @@ rpc_init_mempool(void) rpc_buffer_slabp); if (!rpc_buffer_mempool) goto err_nomem; + if (!rpciod_start()) + goto err_nomem; + /* + * The following is not strictly a mempool initialisation, + * but there is no harm in doing it here + */ + rpc_init_wait_queue(&delay_queue, "delayq"); return 0; err_nomem: rpc_destroy_mempool();