cleanup
[linux-2.4.21-pre4.git] / net / sunrpc / sched.c
1 /*
2  * linux/net/sunrpc/sched.c
3  *
4  * Scheduling for synchronous and asynchronous RPC requests.
5  *
6  * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
7  * 
8  * TCP NFS related read + write fixes
9  * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  */
11
12 #include <linux/module.h>
13
14 #define __KERNEL_SYSCALLS__
15 #include <linux/sched.h>
16 #include <linux/interrupt.h>
17 #include <linux/slab.h>
18 #include <linux/unistd.h>
19 #include <linux/smp.h>
20 #include <linux/smp_lock.h>
21 #include <linux/spinlock.h>
22
23 #include <linux/sunrpc/clnt.h>
24 #include <linux/sunrpc/xprt.h>
25
26 #ifdef RPC_DEBUG
27 #define RPCDBG_FACILITY         RPCDBG_SCHED
28 static int                      rpc_task_id;
29 #endif
30
31 /*
32  * We give RPC the same get_free_pages priority as NFS
33  */
34 #define GFP_RPC                 GFP_NOFS
35
36 static void                     __rpc_default_timer(struct rpc_task *task);
37 static void                     rpciod_killall(void);
38
39 /*
40  * When an asynchronous RPC task is activated within a bottom half
41  * handler, or while executing another RPC task, it is put on
42  * schedq, and rpciod is woken up.
43  */
44 static RPC_WAITQ(schedq, "schedq");
45
46 /*
47  * RPC tasks that create another task (e.g. for contacting the portmapper)
48  * will wait on this queue for their child's completion
49  */
50 static RPC_WAITQ(childq, "childq");
51
52 /*
53  * RPC tasks sit here while waiting for conditions to improve.
54  */
55 static RPC_WAITQ(delay_queue, "delayq");
56
57 /*
58  * All RPC tasks are linked into this list
59  */
60 static LIST_HEAD(all_tasks);
61
62 /*
63  * rpciod-related stuff
64  */
65 static DECLARE_WAIT_QUEUE_HEAD(rpciod_idle);
66 static DECLARE_WAIT_QUEUE_HEAD(rpciod_killer);
67 static DECLARE_MUTEX(rpciod_sema);
68 static unsigned int             rpciod_users;
69 static pid_t                    rpciod_pid;
70 static int                      rpc_inhibit;
71
72 /*
73  * Spinlock for wait queues. Access to the latter also has to be
74  * interrupt-safe in order to allow timers to wake up sleeping tasks.
75  */
76 static spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED;
77 /*
78  * Spinlock for other critical sections of code.
79  */
80 static spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED;
81
82 /*
83  * This is the last-ditch buffer for NFS swap requests
84  */
85 static u32                      swap_buffer[PAGE_SIZE >> 2];
86 static long                     swap_buffer_used;
87
88 /*
89  * Make allocation of the swap_buffer SMP-safe
90  */
91 static __inline__ int rpc_lock_swapbuf(void)
92 {
93         return !test_and_set_bit(1, &swap_buffer_used);
94 }
95 static __inline__ void rpc_unlock_swapbuf(void)
96 {
97         clear_bit(1, &swap_buffer_used);
98 }
99
100 /*
101  * Disable the timer for a given RPC task. Should be called with
102  * rpc_queue_lock and bh_disabled in order to avoid races within
103  * rpc_run_timer().
104  */
105 static inline void
106 __rpc_disable_timer(struct rpc_task *task)
107 {
108         dprintk("RPC: %4d disabling timer\n", task->tk_pid);
109         task->tk_timeout_fn = NULL;
110         task->tk_timeout = 0;
111 }
112
113 /*
114  * Run a timeout function.
115  * We use the callback in order to allow __rpc_wake_up_task()
116  * and friends to disable the timer synchronously on SMP systems
117  * without calling del_timer_sync(). The latter could cause a
118  * deadlock if called while we're holding spinlocks...
119  */
120 static void
121 rpc_run_timer(struct rpc_task *task)
122 {
123         void (*callback)(struct rpc_task *);
124
125         spin_lock_bh(&rpc_queue_lock);
126         callback = task->tk_timeout_fn;
127         task->tk_timeout_fn = NULL;
128         spin_unlock_bh(&rpc_queue_lock);
129         if (callback) {
130                 dprintk("RPC: %4d running timer\n", task->tk_pid);
131                 callback(task);
132         }
133 }
134
135 /*
136  * Set up a timer for the current task.
137  */
138 static inline void
139 __rpc_add_timer(struct rpc_task *task, rpc_action timer)
140 {
141         if (!task->tk_timeout)
142                 return;
143
144         dprintk("RPC: %4d setting alarm for %lu ms\n",
145                         task->tk_pid, task->tk_timeout * 1000 / HZ);
146
147         if (timer)
148                 task->tk_timeout_fn = timer;
149         else
150                 task->tk_timeout_fn = __rpc_default_timer;
151         mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
152 }
153
154 /*
155  * Set up a timer for an already sleeping task.
156  */
157 void rpc_add_timer(struct rpc_task *task, rpc_action timer)
158 {
159         spin_lock_bh(&rpc_queue_lock);
160         if (!RPC_IS_RUNNING(task))
161                 __rpc_add_timer(task, timer);
162         spin_unlock_bh(&rpc_queue_lock);
163 }
164
165 /*
166  * Delete any timer for the current task. Because we use del_timer_sync(),
167  * this function should never be called while holding rpc_queue_lock.
168  */
169 static inline void
170 rpc_delete_timer(struct rpc_task *task)
171 {
172         if (timer_pending(&task->tk_timer)) {
173                 dprintk("RPC: %4d deleting timer\n", task->tk_pid);
174                 del_timer_sync(&task->tk_timer);
175         }
176 }
177
178 /*
179  * Add new request to wait queue.
180  *
181  * Swapper tasks always get inserted at the head of the queue.
182  * This should avoid many nasty memory deadlocks and hopefully
183  * improve overall performance.
184  * Everyone else gets appended to the queue to ensure proper FIFO behavior.
185  */
186 static inline int
187 __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
188 {
189         if (task->tk_rpcwait == queue)
190                 return 0;
191
192         if (task->tk_rpcwait) {
193                 printk(KERN_WARNING "RPC: doubly enqueued task!\n");
194                 return -EWOULDBLOCK;
195         }
196         if (RPC_IS_SWAPPER(task))
197                 list_add(&task->tk_list, &queue->tasks);
198         else
199                 list_add_tail(&task->tk_list, &queue->tasks);
200         task->tk_rpcwait = queue;
201
202         dprintk("RPC: %4d added to queue %p \"%s\"\n",
203                                 task->tk_pid, queue, rpc_qname(queue));
204
205         return 0;
206 }
207
208 int
209 rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
210 {
211         int             result;
212
213         spin_lock_bh(&rpc_queue_lock);
214         result = __rpc_add_wait_queue(q, task);
215         spin_unlock_bh(&rpc_queue_lock);
216         return result;
217 }
218
219 /*
220  * Remove request from queue.
221  * Note: must be called with spin lock held.
222  */
223 static inline void
224 __rpc_remove_wait_queue(struct rpc_task *task)
225 {
226         struct rpc_wait_queue *queue = task->tk_rpcwait;
227
228         if (!queue)
229                 return;
230
231         list_del(&task->tk_list);
232         task->tk_rpcwait = NULL;
233
234         dprintk("RPC: %4d removed from queue %p \"%s\"\n",
235                                 task->tk_pid, queue, rpc_qname(queue));
236 }
237
238 void
239 rpc_remove_wait_queue(struct rpc_task *task)
240 {
241         if (!task->tk_rpcwait)
242                 return;
243         spin_lock_bh(&rpc_queue_lock);
244         __rpc_remove_wait_queue(task);
245         spin_unlock_bh(&rpc_queue_lock);
246 }
247
248 /*
249  * Make an RPC task runnable.
250  *
251  * Note: If the task is ASYNC, this must be called with 
252  * the spinlock held to protect the wait queue operation.
253  */
254 static inline void
255 rpc_make_runnable(struct rpc_task *task)
256 {
257         if (task->tk_timeout_fn) {
258                 printk(KERN_ERR "RPC: task w/ running timer in rpc_make_runnable!!\n");
259                 return;
260         }
261         rpc_set_running(task);
262         if (RPC_IS_ASYNC(task)) {
263                 if (RPC_IS_SLEEPING(task)) {
264                         int status;
265                         status = __rpc_add_wait_queue(&schedq, task);
266                         if (status < 0) {
267                                 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
268                                 task->tk_status = status;
269                                 return;
270                         }
271                         rpc_clear_sleeping(task);
272                         if (waitqueue_active(&rpciod_idle))
273                                 wake_up(&rpciod_idle);
274                 }
275         } else {
276                 rpc_clear_sleeping(task);
277                 if (waitqueue_active(&task->tk_wait))
278                         wake_up(&task->tk_wait);
279         }
280 }
281
282 /*
283  * Place a newly initialized task on the schedq.
284  */
285 static inline void
286 rpc_schedule_run(struct rpc_task *task)
287 {
288         /* Don't run a child twice! */
289         if (RPC_IS_ACTIVATED(task))
290                 return;
291         task->tk_active = 1;
292         rpc_set_sleeping(task);
293         rpc_make_runnable(task);
294 }
295
296 /*
297  *      For other people who may need to wake the I/O daemon
298  *      but should (for now) know nothing about its innards
299  */
300 void rpciod_wake_up(void)
301 {
302         if(rpciod_pid==0)
303                 printk(KERN_ERR "rpciod: wot no daemon?\n");
304         if (waitqueue_active(&rpciod_idle))
305                 wake_up(&rpciod_idle);
306 }
307
308 /*
309  * Prepare for sleeping on a wait queue.
310  * By always appending tasks to the list we ensure FIFO behavior.
311  * NB: An RPC task will only receive interrupt-driven events as long
312  * as it's on a wait queue.
313  */
314 static void
315 __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
316                         rpc_action action, rpc_action timer)
317 {
318         int status;
319
320         dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
321                                 rpc_qname(q), jiffies);
322
323         if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
324                 printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
325                 return;
326         }
327
328         /* Mark the task as being activated if so needed */
329         if (!RPC_IS_ACTIVATED(task)) {
330                 task->tk_active = 1;
331                 rpc_set_sleeping(task);
332         }
333
334         status = __rpc_add_wait_queue(q, task);
335         if (status) {
336                 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
337                 task->tk_status = status;
338         } else {
339                 rpc_clear_running(task);
340                 if (task->tk_callback) {
341                         dprintk(KERN_ERR "RPC: %4d overwrites an active callback\n", task->tk_pid);
342                         BUG();
343                 }
344                 task->tk_callback = action;
345                 __rpc_add_timer(task, timer);
346         }
347 }
348
349 void
350 rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
351                                 rpc_action action, rpc_action timer)
352 {
353         /*
354          * Protect the queue operations.
355          */
356         spin_lock_bh(&rpc_queue_lock);
357         __rpc_sleep_on(q, task, action, timer);
358         spin_unlock_bh(&rpc_queue_lock);
359 }
360
361 /**
362  * __rpc_wake_up_task - wake up a single rpc_task
363  * @task: task to be woken up
364  *
365  * Caller must hold rpc_queue_lock
366  */
367 static void
368 __rpc_wake_up_task(struct rpc_task *task)
369 {
370         dprintk("RPC: %4d __rpc_wake_up_task (now %ld inh %d)\n",
371                                         task->tk_pid, jiffies, rpc_inhibit);
372
373 #ifdef RPC_DEBUG
374         if (task->tk_magic != 0xf00baa) {
375                 printk(KERN_ERR "RPC: attempt to wake up non-existing task!\n");
376                 rpc_debug = ~0;
377                 rpc_show_tasks();
378                 return;
379         }
380 #endif
381         /* Has the task been executed yet? If not, we cannot wake it up! */
382         if (!RPC_IS_ACTIVATED(task)) {
383                 printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
384                 return;
385         }
386         if (RPC_IS_RUNNING(task))
387                 return;
388
389         __rpc_disable_timer(task);
390         if (task->tk_rpcwait != &schedq)
391                 __rpc_remove_wait_queue(task);
392
393         rpc_make_runnable(task);
394
395         dprintk("RPC:      __rpc_wake_up_task done\n");
396 }
397
398 /*
399  * Default timeout handler if none specified by user
400  */
401 static void
402 __rpc_default_timer(struct rpc_task *task)
403 {
404         dprintk("RPC: %d timeout (default timer)\n", task->tk_pid);
405         task->tk_status = -ETIMEDOUT;
406         rpc_wake_up_task(task);
407 }
408
409 /*
410  * Wake up the specified task
411  */
412 void
413 rpc_wake_up_task(struct rpc_task *task)
414 {
415         if (RPC_IS_RUNNING(task))
416                 return;
417         spin_lock_bh(&rpc_queue_lock);
418         __rpc_wake_up_task(task);
419         spin_unlock_bh(&rpc_queue_lock);
420 }
421
422 /*
423  * Wake up the next task on the wait queue.
424  */
425 struct rpc_task *
426 rpc_wake_up_next(struct rpc_wait_queue *queue)
427 {
428         struct rpc_task *task = NULL;
429
430         dprintk("RPC:      wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
431         spin_lock_bh(&rpc_queue_lock);
432         task_for_first(task, &queue->tasks)
433                 __rpc_wake_up_task(task);
434         spin_unlock_bh(&rpc_queue_lock);
435
436         return task;
437 }
438
439 /**
440  * rpc_wake_up - wake up all rpc_tasks
441  * @queue: rpc_wait_queue on which the tasks are sleeping
442  *
443  * Grabs rpc_queue_lock
444  */
445 void
446 rpc_wake_up(struct rpc_wait_queue *queue)
447 {
448         struct rpc_task *task;
449
450         spin_lock_bh(&rpc_queue_lock);
451         while (!list_empty(&queue->tasks))
452                 task_for_first(task, &queue->tasks)
453                         __rpc_wake_up_task(task);
454         spin_unlock_bh(&rpc_queue_lock);
455 }
456
457 /**
458  * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
459  * @queue: rpc_wait_queue on which the tasks are sleeping
460  * @status: status value to set
461  *
462  * Grabs rpc_queue_lock
463  */
464 void
465 rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
466 {
467         struct rpc_task *task;
468
469         spin_lock_bh(&rpc_queue_lock);
470         while (!list_empty(&queue->tasks)) {
471                 task_for_first(task, &queue->tasks) {
472                         task->tk_status = status;
473                         __rpc_wake_up_task(task);
474                 }
475         }
476         spin_unlock_bh(&rpc_queue_lock);
477 }
478
479 /*
480  * Run a task at a later time
481  */
482 static void     __rpc_atrun(struct rpc_task *);
483 void
484 rpc_delay(struct rpc_task *task, unsigned long delay)
485 {
486         task->tk_timeout = delay;
487         rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
488 }
489
490 static void
491 __rpc_atrun(struct rpc_task *task)
492 {
493         task->tk_status = 0;
494         rpc_wake_up_task(task);
495 }
496
497 /*
498  * This is the RPC `scheduler' (or rather, the finite state machine).
499  */
500 static int
501 __rpc_execute(struct rpc_task *task)
502 {
503         int             status = 0;
504
505         dprintk("RPC: %4d rpc_execute flgs %x\n",
506                                 task->tk_pid, task->tk_flags);
507
508         if (!RPC_IS_RUNNING(task)) {
509                 printk(KERN_WARNING "RPC: rpc_execute called for sleeping task!!\n");
510                 return 0;
511         }
512
513  restarted:
514         while (1) {
515                 /*
516                  * Execute any pending callback.
517                  */
518                 if (RPC_DO_CALLBACK(task)) {
519                         /* Define a callback save pointer */
520                         void (*save_callback)(struct rpc_task *);
521         
522                         /* 
523                          * If a callback exists, save it, reset it,
524                          * call it.
525                          * The save is needed to stop from resetting
526                          * another callback set within the callback handler
527                          * - Dave
528                          */
529                         save_callback=task->tk_callback;
530                         task->tk_callback=NULL;
531                         save_callback(task);
532                 }
533
534                 /*
535                  * Perform the next FSM step.
536                  * tk_action may be NULL when the task has been killed
537                  * by someone else.
538                  */
539                 if (RPC_IS_RUNNING(task)) {
540                         /*
541                          * Garbage collection of pending timers...
542                          */
543                         rpc_delete_timer(task);
544                         if (!task->tk_action)
545                                 break;
546                         task->tk_action(task);
547                 }
548
549                 /*
550                  * Check whether task is sleeping.
551                  */
552                 spin_lock_bh(&rpc_queue_lock);
553                 if (!RPC_IS_RUNNING(task)) {
554                         rpc_set_sleeping(task);
555                         if (RPC_IS_ASYNC(task)) {
556                                 spin_unlock_bh(&rpc_queue_lock);
557                                 return 0;
558                         }
559                 }
560                 spin_unlock_bh(&rpc_queue_lock);
561
562                 while (RPC_IS_SLEEPING(task)) {
563                         /* sync task: sleep here */
564                         dprintk("RPC: %4d sync task going to sleep\n",
565                                                         task->tk_pid);
566                         if (current->pid == rpciod_pid)
567                                 printk(KERN_ERR "RPC: rpciod waiting on sync task!\n");
568
569                         __wait_event(task->tk_wait, !RPC_IS_SLEEPING(task));
570                         dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
571
572                         /*
573                          * When a sync task receives a signal, it exits with
574                          * -ERESTARTSYS. In order to catch any callbacks that
575                          * clean up after sleeping on some queue, we don't
576                          * break the loop here, but go around once more.
577                          */
578                         if (task->tk_client->cl_intr && signalled()) {
579                                 dprintk("RPC: %4d got signal\n", task->tk_pid);
580                                 task->tk_flags |= RPC_TASK_KILLED;
581                                 rpc_exit(task, -ERESTARTSYS);
582                                 rpc_wake_up_task(task);
583                         }
584                 }
585         }
586
587         if (task->tk_exit) {
588                 task->tk_exit(task);
589                 /* If tk_action is non-null, the user wants us to restart */
590                 if (task->tk_action) {
591                         if (!RPC_ASSASSINATED(task)) {
592                                 /* Release RPC slot and buffer memory */
593                                 if (task->tk_rqstp)
594                                         xprt_release(task);
595                                 if (task->tk_buffer) {
596                                         rpc_free(task->tk_buffer);
597                                         task->tk_buffer = NULL;
598                                 }
599                                 goto restarted;
600                         }
601                         printk(KERN_ERR "RPC: dead task tries to walk away.\n");
602                 }
603         }
604
605         dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status);
606         status = task->tk_status;
607
608         /* Release all resources associated with the task */
609         rpc_release_task(task);
610
611         return status;
612 }
613
614 /*
615  * User-visible entry point to the scheduler.
616  *
617  * This may be called recursively if e.g. an async NFS task updates
618  * the attributes and finds that dirty pages must be flushed.
619  * NOTE: Upon exit of this function the task is guaranteed to be
620  *       released. In particular note that tk_release() will have
621  *       been called, so your task memory may have been freed.
622  */
623 int
624 rpc_execute(struct rpc_task *task)
625 {
626         int status = -EIO;
627         if (rpc_inhibit) {
628                 printk(KERN_INFO "RPC: execution inhibited!\n");
629                 goto out_release;
630         }
631
632         status = -EWOULDBLOCK;
633         if (task->tk_active) {
634                 printk(KERN_ERR "RPC: active task was run twice!\n");
635                 goto out_err;
636         }
637
638         task->tk_active = 1;
639         rpc_set_running(task);
640         return __rpc_execute(task);
641  out_release:
642         rpc_release_task(task);
643  out_err:
644         return status;
645 }
646
647 /*
648  * This is our own little scheduler for async RPC tasks.
649  */
650 static void
651 __rpc_schedule(void)
652 {
653         struct rpc_task *task;
654         int             count = 0;
655
656         dprintk("RPC:      rpc_schedule enter\n");
657         while (1) {
658                 spin_lock_bh(&rpc_queue_lock);
659
660                 task_for_first(task, &schedq.tasks) {
661                         __rpc_remove_wait_queue(task);
662                         spin_unlock_bh(&rpc_queue_lock);
663
664                         __rpc_execute(task);
665                 } else {
666                         spin_unlock_bh(&rpc_queue_lock);
667                         break;
668                 }
669
670                 if (++count >= 200 || current->need_resched) {
671                         count = 0;
672                         schedule();
673                 }
674         }
675         dprintk("RPC:      rpc_schedule leave\n");
676 }
677
678 /*
679  * Allocate memory for RPC purpose.
680  *
681  * This is yet another tricky issue: For sync requests issued by
682  * a user process, we want to make kmalloc sleep if there isn't
683  * enough memory. Async requests should not sleep too excessively
684  * because that will block rpciod (but that's not dramatic when
685  * it's starved of memory anyway). Finally, swapout requests should
686  * never sleep at all, and should not trigger another swap_out
687  * request through kmalloc which would just increase memory contention.
688  *
689  * I hope the following gets it right, which gives async requests
690  * a slight advantage over sync requests (good for writeback, debatable
691  * for readahead):
692  *
693  *   sync user requests:        GFP_KERNEL
694  *   async requests:            GFP_RPC         (== GFP_NOFS)
695  *   swap requests:             GFP_ATOMIC      (or new GFP_SWAPPER)
696  */
697 void *
698 rpc_allocate(unsigned int flags, unsigned int size)
699 {
700         u32     *buffer;
701         int     gfp;
702
703         if (flags & RPC_TASK_SWAPPER)
704                 gfp = GFP_ATOMIC;
705         else if (flags & RPC_TASK_ASYNC)
706                 gfp = GFP_RPC;
707         else
708                 gfp = GFP_KERNEL;
709
710         do {
711                 if ((buffer = (u32 *) kmalloc(size, gfp)) != NULL) {
712                         dprintk("RPC:      allocated buffer %p\n", buffer);
713                         return buffer;
714                 }
715                 if ((flags & RPC_TASK_SWAPPER) && size <= sizeof(swap_buffer)
716                     && rpc_lock_swapbuf()) {
717                         dprintk("RPC:      used last-ditch swap buffer\n");
718                         return swap_buffer;
719                 }
720                 if (flags & RPC_TASK_ASYNC)
721                         return NULL;
722                 yield();
723         } while (!signalled());
724
725         return NULL;
726 }
727
728 void
729 rpc_free(void *buffer)
730 {
731         if (buffer != swap_buffer) {
732                 kfree(buffer);
733                 return;
734         }
735         rpc_unlock_swapbuf();
736 }
737
738 /*
739  * Creation and deletion of RPC task structures
740  */
741 inline void
742 rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt,
743                                 rpc_action callback, int flags)
744 {
745         memset(task, 0, sizeof(*task));
746         init_timer(&task->tk_timer);
747         task->tk_timer.data     = (unsigned long) task;
748         task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
749         task->tk_client = clnt;
750         task->tk_flags  = flags;
751         task->tk_exit   = callback;
752         init_waitqueue_head(&task->tk_wait);
753         if (current->uid != current->fsuid || current->gid != current->fsgid)
754                 task->tk_flags |= RPC_TASK_SETUID;
755
756         /* Initialize retry counters */
757         task->tk_garb_retry = 2;
758         task->tk_cred_retry = 2;
759         task->tk_suid_retry = 1;
760
761         /* Add to global list of all tasks */
762         spin_lock(&rpc_sched_lock);
763         list_add(&task->tk_task, &all_tasks);
764         spin_unlock(&rpc_sched_lock);
765
766         if (clnt)
767                 atomic_inc(&clnt->cl_users);
768
769 #ifdef RPC_DEBUG
770         task->tk_magic = 0xf00baa;
771         task->tk_pid = rpc_task_id++;
772 #endif
773         dprintk("RPC: %4d new task procpid %d\n", task->tk_pid,
774                                 current->pid);
775 }
776
777 static void
778 rpc_default_free_task(struct rpc_task *task)
779 {
780         dprintk("RPC: %4d freeing task\n", task->tk_pid);
781         rpc_free(task);
782 }
783
784 /*
785  * Create a new task for the specified client.  We have to
786  * clean up after an allocation failure, as the client may
787  * have specified "oneshot".
788  */
789 struct rpc_task *
790 rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags)
791 {
792         struct rpc_task *task;
793
794         task = (struct rpc_task *) rpc_allocate(flags, sizeof(*task));
795         if (!task)
796                 goto cleanup;
797
798         rpc_init_task(task, clnt, callback, flags);
799
800         /* Replace tk_release */
801         task->tk_release = rpc_default_free_task;
802
803         dprintk("RPC: %4d allocated task\n", task->tk_pid);
804         task->tk_flags |= RPC_TASK_DYNAMIC;
805 out:
806         return task;
807
808 cleanup:
809         /* Check whether to release the client */
810         if (clnt) {
811                 printk("rpc_new_task: failed, users=%d, oneshot=%d\n",
812                         atomic_read(&clnt->cl_users), clnt->cl_oneshot);
813                 atomic_inc(&clnt->cl_users); /* pretend we were used ... */
814                 rpc_release_client(clnt);
815         }
816         goto out;
817 }
818
819 void
820 rpc_release_task(struct rpc_task *task)
821 {
822         dprintk("RPC: %4d release task\n", task->tk_pid);
823
824 #ifdef RPC_DEBUG
825         if (task->tk_magic != 0xf00baa) {
826                 printk(KERN_ERR "RPC: attempt to release a non-existing task!\n");
827                 rpc_debug = ~0;
828                 rpc_show_tasks();
829                 return;
830         }
831 #endif
832
833         /* Remove from global task list */
834         spin_lock(&rpc_sched_lock);
835         list_del(&task->tk_task);
836         spin_unlock(&rpc_sched_lock);
837
838         /* Protect the execution below. */
839         spin_lock_bh(&rpc_queue_lock);
840
841         /* Disable timer to prevent zombie wakeup */
842         __rpc_disable_timer(task);
843
844         /* Remove from any wait queue we're still on */
845         __rpc_remove_wait_queue(task);
846
847         task->tk_active = 0;
848
849         spin_unlock_bh(&rpc_queue_lock);
850
851         /* Synchronously delete any running timer */
852         rpc_delete_timer(task);
853
854         /* Release resources */
855         if (task->tk_rqstp)
856                 xprt_release(task);
857         if (task->tk_msg.rpc_cred)
858                 rpcauth_unbindcred(task);
859         if (task->tk_buffer) {
860                 rpc_free(task->tk_buffer);
861                 task->tk_buffer = NULL;
862         }
863         if (task->tk_client) {
864                 rpc_release_client(task->tk_client);
865                 task->tk_client = NULL;
866         }
867
868 #ifdef RPC_DEBUG
869         task->tk_magic = 0;
870 #endif
871         if (task->tk_release)
872                 task->tk_release(task);
873 }
874
875 /**
876  * rpc_find_parent - find the parent of a child task.
877  * @child: child task
878  *
879  * Checks that the parent task is still sleeping on the
880  * queue 'childq'. If so returns a pointer to the parent.
881  * Upon failure returns NULL.
882  *
883  * Caller must hold rpc_queue_lock
884  */
885 static inline struct rpc_task *
886 rpc_find_parent(struct rpc_task *child)
887 {
888         struct rpc_task *task, *parent;
889         struct list_head *le;
890
891         parent = (struct rpc_task *) child->tk_calldata;
892         task_for_each(task, le, &childq.tasks)
893                 if (task == parent)
894                         return parent;
895
896         return NULL;
897 }
898
899 static void
900 rpc_child_exit(struct rpc_task *child)
901 {
902         struct rpc_task *parent;
903
904         spin_lock_bh(&rpc_queue_lock);
905         if ((parent = rpc_find_parent(child)) != NULL) {
906                 parent->tk_status = child->tk_status;
907                 __rpc_wake_up_task(parent);
908         }
909         spin_unlock_bh(&rpc_queue_lock);
910 }
911
912 /*
913  * Note: rpc_new_task releases the client after a failure.
914  */
915 struct rpc_task *
916 rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent)
917 {
918         struct rpc_task *task;
919
920         task = rpc_new_task(clnt, NULL, RPC_TASK_ASYNC | RPC_TASK_CHILD);
921         if (!task)
922                 goto fail;
923         task->tk_exit = rpc_child_exit;
924         task->tk_calldata = parent;
925         return task;
926
927 fail:
928         parent->tk_status = -ENOMEM;
929         return NULL;
930 }
931
932 void
933 rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
934 {
935         spin_lock_bh(&rpc_queue_lock);
936         /* N.B. Is it possible for the child to have already finished? */
937         __rpc_sleep_on(&childq, task, func, NULL);
938         rpc_schedule_run(child);
939         spin_unlock_bh(&rpc_queue_lock);
940 }
941
942 /*
943  * Kill all tasks for the given client.
944  * XXX: kill their descendants as well?
945  */
946 void
947 rpc_killall_tasks(struct rpc_clnt *clnt)
948 {
949         struct rpc_task *rovr;
950         struct list_head *le;
951
952         dprintk("RPC:      killing all tasks for client %p\n", clnt);
953
954         /*
955          * Spin lock all_tasks to prevent changes...
956          */
957         spin_lock(&rpc_sched_lock);
958         alltask_for_each(rovr, le, &all_tasks)
959                 if (!clnt || rovr->tk_client == clnt) {
960                         rovr->tk_flags |= RPC_TASK_KILLED;
961                         rpc_exit(rovr, -EIO);
962                         rpc_wake_up_task(rovr);
963                 }
964         spin_unlock(&rpc_sched_lock);
965 }
966
967 static DECLARE_MUTEX_LOCKED(rpciod_running);
968
969 static inline int
970 rpciod_task_pending(void)
971 {
972         return !list_empty(&schedq.tasks);
973 }
974
975
976 /*
977  * This is the rpciod kernel thread
978  */
979 static int
980 rpciod(void *ptr)
981 {
982         wait_queue_head_t *assassin = (wait_queue_head_t*) ptr;
983         int             rounds = 0;
984
985         MOD_INC_USE_COUNT;
986         lock_kernel();
987         /*
988          * Let our maker know we're running ...
989          */
990         rpciod_pid = current->pid;
991         up(&rpciod_running);
992
993         daemonize();
994
995         spin_lock_irq(&current->sigmask_lock);
996         siginitsetinv(&current->blocked, sigmask(SIGKILL));
997         recalc_sigpending(current);
998         spin_unlock_irq(&current->sigmask_lock);
999
1000         strcpy(current->comm, "rpciod");
1001
1002         dprintk("RPC: rpciod starting (pid %d)\n", rpciod_pid);
1003         while (rpciod_users) {
1004                 if (signalled()) {
1005                         rpciod_killall();
1006                         flush_signals(current);
1007                 }
1008                 __rpc_schedule();
1009
1010                 if (++rounds >= 64) {   /* safeguard */
1011                         schedule();
1012                         rounds = 0;
1013                 }
1014
1015                 if (!rpciod_task_pending()) {
1016                         dprintk("RPC: rpciod back to sleep\n");
1017                         wait_event_interruptible(rpciod_idle, rpciod_task_pending());
1018                         dprintk("RPC: switch to rpciod\n");
1019                         rounds = 0;
1020                 }
1021         }
1022
1023         dprintk("RPC: rpciod shutdown commences\n");
1024         if (!list_empty(&all_tasks)) {
1025                 printk(KERN_ERR "rpciod: active tasks at shutdown?!\n");
1026                 rpciod_killall();
1027         }
1028
1029         rpciod_pid = 0;
1030         wake_up(assassin);
1031
1032         dprintk("RPC: rpciod exiting\n");
1033         MOD_DEC_USE_COUNT;
1034         return 0;
1035 }
1036
1037 static void
1038 rpciod_killall(void)
1039 {
1040         unsigned long flags;
1041
1042         while (!list_empty(&all_tasks)) {
1043                 current->sigpending = 0;
1044                 rpc_killall_tasks(NULL);
1045                 __rpc_schedule();
1046                 if (!list_empty(&all_tasks)) {
1047                         dprintk("rpciod_killall: waiting for tasks to exit\n");
1048                         yield();
1049                 }
1050         }
1051
1052         spin_lock_irqsave(&current->sigmask_lock, flags);
1053         recalc_sigpending(current);
1054         spin_unlock_irqrestore(&current->sigmask_lock, flags);
1055 }
1056
1057 /*
1058  * Start up the rpciod process if it's not already running.
1059  */
1060 int
1061 rpciod_up(void)
1062 {
1063         int error = 0;
1064
1065         MOD_INC_USE_COUNT;
1066         down(&rpciod_sema);
1067         dprintk("rpciod_up: pid %d, users %d\n", rpciod_pid, rpciod_users);
1068         rpciod_users++;
1069         if (rpciod_pid)
1070                 goto out;
1071         /*
1072          * If there's no pid, we should be the first user.
1073          */
1074         if (rpciod_users > 1)
1075                 printk(KERN_WARNING "rpciod_up: no pid, %d users??\n", rpciod_users);
1076         /*
1077          * Create the rpciod thread and wait for it to start.
1078          */
1079         error = kernel_thread(rpciod, &rpciod_killer, 0);
1080         if (error < 0) {
1081                 printk(KERN_WARNING "rpciod_up: create thread failed, error=%d\n", error);
1082                 rpciod_users--;
1083                 goto out;
1084         }
1085         down(&rpciod_running);
1086         error = 0;
1087 out:
1088         up(&rpciod_sema);
1089         MOD_DEC_USE_COUNT;
1090         return error;
1091 }
1092
1093 void
1094 rpciod_down(void)
1095 {
1096         unsigned long flags;
1097
1098         MOD_INC_USE_COUNT;
1099         down(&rpciod_sema);
1100         dprintk("rpciod_down pid %d sema %d\n", rpciod_pid, rpciod_users);
1101         if (rpciod_users) {
1102                 if (--rpciod_users)
1103                         goto out;
1104         } else
1105                 printk(KERN_WARNING "rpciod_down: pid=%d, no users??\n", rpciod_pid);
1106
1107         if (!rpciod_pid) {
1108                 dprintk("rpciod_down: Nothing to do!\n");
1109                 goto out;
1110         }
1111
1112         kill_proc(rpciod_pid, SIGKILL, 1);
1113         /*
1114          * Usually rpciod will exit very quickly, so we
1115          * wait briefly before checking the process id.
1116          */
1117         current->sigpending = 0;
1118         yield();
1119         /*
1120          * Display a message if we're going to wait longer.
1121          */
1122         while (rpciod_pid) {
1123                 dprintk("rpciod_down: waiting for pid %d to exit\n", rpciod_pid);
1124                 if (signalled()) {
1125                         dprintk("rpciod_down: caught signal\n");
1126                         break;
1127                 }
1128                 interruptible_sleep_on(&rpciod_killer);
1129         }
1130         spin_lock_irqsave(&current->sigmask_lock, flags);
1131         recalc_sigpending(current);
1132         spin_unlock_irqrestore(&current->sigmask_lock, flags);
1133 out:
1134         up(&rpciod_sema);
1135         MOD_DEC_USE_COUNT;
1136 }
1137
1138 #ifdef RPC_DEBUG
1139 void rpc_show_tasks(void)
1140 {
1141         struct list_head *le;
1142         struct rpc_task *t;
1143
1144         spin_lock(&rpc_sched_lock);
1145         if (list_empty(&all_tasks)) {
1146                 spin_unlock(&rpc_sched_lock);
1147                 return;
1148         }
1149         printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
1150                 "-rpcwait -action- --exit--\n");
1151         alltask_for_each(t, le, &all_tasks)
1152                 printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
1153                         t->tk_pid, t->tk_msg.rpc_proc, t->tk_flags, t->tk_status,
1154                         t->tk_client, t->tk_client->cl_prog,
1155                         t->tk_rqstp, t->tk_timeout,
1156                         t->tk_rpcwait ? rpc_qname(t->tk_rpcwait) : " <NULL> ",
1157                         t->tk_action, t->tk_exit);
1158         spin_unlock(&rpc_sched_lock);
1159 }
1160 #endif