[PATCH] dio: formalize bio counters as a dio reference count
[powerpc.git] / fs / direct-io.c
index b296942..bc1cbf9 100644 (file)
@@ -121,9 +121,8 @@ struct dio {
        int page_errors;                /* errno from get_user_pages() */
 
        /* BIO completion state */
+       atomic_t refcount;              /* direct_io_worker() and bios */
        spinlock_t bio_lock;            /* protects BIO fields below */
-       int bio_count;                  /* nr bios to be completed */
-       int bios_in_flight;             /* nr bios in flight */
        struct bio *bio_list;           /* singly linked via bi_private */
        struct task_struct *waiter;     /* waiting task (NULL if none) */
 
@@ -256,44 +255,27 @@ static int dio_complete(struct dio *dio, loff_t offset, int ret)
  * Called when a BIO has been processed.  If the count goes to zero then IO is
  * complete and we can signal this to the AIO layer.
  */
-static void finished_one_bio(struct dio *dio)
+static void dio_complete_aio(struct dio *dio)
 {
        unsigned long flags;
+       int ret;
 
-       spin_lock_irqsave(&dio->bio_lock, flags);
-       if (dio->bio_count == 1) {
-               if (dio->is_async) {
-                       int ret;
-
-                       /*
-                        * Last reference to the dio is going away.
-                        * Drop spinlock and complete the DIO.
-                        */
-                       spin_unlock_irqrestore(&dio->bio_lock, flags);
-
-                       ret = dio_complete(dio, dio->iocb->ki_pos, 0);
+       ret = dio_complete(dio, dio->iocb->ki_pos, 0);
 
-                       /* Complete AIO later if falling back to buffered i/o */
-                       if (dio->result == dio->size ||
-                               ((dio->rw == READ) && dio->result)) {
-                               aio_complete(dio->iocb, ret, 0);
-                               kfree(dio);
-                               return;
-                       } else {
-                               /*
-                                * Falling back to buffered
-                                */
-                               spin_lock_irqsave(&dio->bio_lock, flags);
-                               dio->bio_count--;
-                               if (dio->waiter)
-                                       wake_up_process(dio->waiter);
-                               spin_unlock_irqrestore(&dio->bio_lock, flags);
-                               return;
-                       }
-               }
+       /* Complete AIO later if falling back to buffered i/o */
+       if (dio->result == dio->size ||
+               ((dio->rw == READ) && dio->result)) {
+               aio_complete(dio->iocb, ret, 0);
+               kfree(dio);
+       } else {
+               /*
+                * Falling back to buffered
+                */
+               spin_lock_irqsave(&dio->bio_lock, flags);
+               if (dio->waiter)
+                       wake_up_process(dio->waiter);
+               spin_unlock_irqrestore(&dio->bio_lock, flags);
        }
-       dio->bio_count--;
-       spin_unlock_irqrestore(&dio->bio_lock, flags);
 }
 
 static int dio_bio_complete(struct dio *dio, struct bio *bio);
@@ -309,6 +291,10 @@ static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
 
        /* cleanup the bio */
        dio_bio_complete(dio, bio);
+
+       if (atomic_dec_and_test(&dio->refcount))
+               dio_complete_aio(dio);
+
        return 0;
 }
 
@@ -330,8 +316,7 @@ static int dio_bio_end_io(struct bio *bio, unsigned int bytes_done, int error)
        spin_lock_irqsave(&dio->bio_lock, flags);
        bio->bi_private = dio->bio_list;
        dio->bio_list = bio;
-       dio->bios_in_flight--;
-       if (dio->waiter && dio->bios_in_flight == 0)
+       if ((atomic_sub_return(1, &dio->refcount) == 1) && dio->waiter)
                wake_up_process(dio->waiter);
        spin_unlock_irqrestore(&dio->bio_lock, flags);
        return 0;
@@ -362,17 +347,15 @@ dio_bio_alloc(struct dio *dio, struct block_device *bdev,
  * In the AIO read case we speculatively dirty the pages before starting IO.
  * During IO completion, any of these pages which happen to have been written
  * back will be redirtied by bio_check_pages_dirty().
+ *
+ * bios hold a dio reference between submit_bio and ->end_io.
  */
 static void dio_bio_submit(struct dio *dio)
 {
        struct bio *bio = dio->bio;
-       unsigned long flags;
 
        bio->bi_private = dio;
-       spin_lock_irqsave(&dio->bio_lock, flags);
-       dio->bio_count++;
-       dio->bios_in_flight++;
-       spin_unlock_irqrestore(&dio->bio_lock, flags);
+       atomic_inc(&dio->refcount);
        if (dio->is_async && dio->rw == READ)
                bio_set_pages_dirty(bio);
        submit_bio(dio->rw, bio);
@@ -390,18 +373,28 @@ static void dio_cleanup(struct dio *dio)
                page_cache_release(dio_get_page(dio));
 }
 
+static int wait_for_more_bios(struct dio *dio)
+{
+       assert_spin_locked(&dio->bio_lock);
+
+       return (atomic_read(&dio->refcount) > 1) && (dio->bio_list == NULL);
+}
+
 /*
- * Wait for the next BIO to complete.  Remove it and return it.
+ * Wait for the next BIO to complete.  Remove it and return it.  NULL is
+ * returned once all BIOs have been completed.  This must only be called once
+ * all bios have been issued so that dio->refcount can only decrease.  This
+ * requires that that the caller hold a reference on the dio.
  */
 static struct bio *dio_await_one(struct dio *dio)
 {
        unsigned long flags;
-       struct bio *bio;
+       struct bio *bio = NULL;
 
        spin_lock_irqsave(&dio->bio_lock, flags);
-       while (dio->bio_list == NULL) {
+       while (wait_for_more_bios(dio)) {
                set_current_state(TASK_UNINTERRUPTIBLE);
-               if (dio->bio_list == NULL) {
+               if (wait_for_more_bios(dio)) {
                        dio->waiter = current;
                        spin_unlock_irqrestore(&dio->bio_lock, flags);
                        io_schedule();
@@ -410,8 +403,10 @@ static struct bio *dio_await_one(struct dio *dio)
                }
                set_current_state(TASK_RUNNING);
        }
-       bio = dio->bio_list;
-       dio->bio_list = bio->bi_private;
+       if (dio->bio_list) {
+               bio = dio->bio_list;
+               dio->bio_list = bio->bi_private;
+       }
        spin_unlock_irqrestore(&dio->bio_lock, flags);
        return bio;
 }
@@ -440,25 +435,24 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio)
                }
                bio_put(bio);
        }
-       finished_one_bio(dio);
        return uptodate ? 0 : -EIO;
 }
 
 /*
- * Wait on and process all in-flight BIOs.
+ * Wait on and process all in-flight BIOs.  This must only be called once
+ * all bios have been issued so that the refcount can only decrease.
+ * This just waits for all bios to make it through dio_bio_complete.  IO
+ * errors are propogated through dio->io_error and should be propogated via
+ * dio_complete().
  */
 static void dio_await_completion(struct dio *dio)
 {
-       /*
-        * The bio_lock is not held for the read of bio_count.
-        * This is ok since it is the dio_bio_complete() that changes
-        * bio_count.
-        */
-       while (dio->bio_count) {
-               struct bio *bio = dio_await_one(dio);
-               /* io errors are propogated through dio->io_error */
-               dio_bio_complete(dio, bio);
-       }
+       struct bio *bio;
+       do {
+               bio = dio_await_one(dio);
+               if (bio)
+                       dio_bio_complete(dio, bio);
+       } while (bio);
 }
 
 /*
@@ -995,16 +989,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
        dio->iocb = iocb;
        dio->i_size = i_size_read(inode);
 
-       /*
-        * BIO completion state.
-        *
-        * ->bio_count starts out at one, and we decrement it to zero after all
-        * BIOs are submitted.  This to avoid the situation where a really fast
-        * (or synchronous) device could take the count to zero while we're
-        * still submitting BIOs.
-        */
-       dio->bio_count = 1;
-       dio->bios_in_flight = 0;
+       atomic_set(&dio->refcount, 1);
        spin_lock_init(&dio->bio_lock);
        dio->bio_list = NULL;
        dio->waiter = NULL;
@@ -1111,7 +1096,11 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
                }
                if (ret == 0)
                        ret = dio->result;
-               finished_one_bio(dio);          /* This can free the dio */
+
+               /* this can free the dio */
+               if (atomic_dec_and_test(&dio->refcount))
+                       dio_complete_aio(dio);
+
                if (should_wait) {
                        unsigned long flags;
                        /*
@@ -1122,7 +1111,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
 
                        spin_lock_irqsave(&dio->bio_lock, flags);
                        set_current_state(TASK_UNINTERRUPTIBLE);
-                       while (dio->bio_count) {
+                       while (atomic_read(&dio->refcount)) {
                                spin_unlock_irqrestore(&dio->bio_lock, flags);
                                io_schedule();
                                spin_lock_irqsave(&dio->bio_lock, flags);
@@ -1133,7 +1122,6 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
                        kfree(dio);
                }
        } else {
-               finished_one_bio(dio);
                dio_await_completion(dio);
 
                ret = dio_complete(dio, offset, ret);
@@ -1146,7 +1134,11 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
                         * i/o, we have to mark the the aio complete.
                         */
                        aio_complete(iocb, ret, 0);
-               kfree(dio);
+
+               if (atomic_dec_and_test(&dio->refcount))
+                       kfree(dio);
+               else
+                       BUG();
        }
        return ret;
 }