Adding working but somewhat preliminary poll() support.
authorDerrik Pates <demon@now.ai>
Wed, 10 Aug 2011 02:00:03 +0000 (20:00 -0600)
committerDerrik Pates <demon@now.ai>
Wed, 10 Aug 2011 02:00:03 +0000 (20:00 -0600)
This is my current-state support for poll() (my three-quarter-assed
implementation of it anyway). I attempted to implement the poll handle
as an object, but ran into threading problems with that, so it evolved
in a slightly different direction.

Also had to add a small hack in S_fh_store_handle(), as when it
tried to use mg->mg_ptr to make the filehandle, the fsel.pl example
file ended up with all the file descriptors lost but the one. I'm not
currently sure why that's so; I've never seen that happen before, but
it seems to be threading related, as if I disable the 'threaded' flag
to Fuse::main(), it works. I think it may be an undiagnosed threading
interaction...

Fuse.pm
Fuse.xs
examples/fsel.pl [new file with mode: 0755]
examples/fselclient.pl [new file with mode: 0755]

diff --git a/Fuse.pm b/Fuse.pm
index 7a5b999..5a1f5d9 100755 (executable)
--- a/Fuse.pm
+++ b/Fuse.pm
@@ -25,6 +25,9 @@ our %EXPORT_TAGS = (
                    'ioctl' => [ qw(FUSE_IOCTL_COMPAT FUSE_IOCTL_UNRESTRICTED FUSE_IOCTL_RETRY FUSE_IOCTL_MAX_IOV) ],
                    );
 
+if (fuse_version() >= 2.8) {
+    push(@{$EXPORT_TAGS{'all'}}, qw(notify_poll pollhandle_destroy));
+
 our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } );
 
 our @EXPORT = ();
@@ -91,8 +94,7 @@ sub main {
                # arch with a 64 bit pointer will align everything to
                # 8 bytes, making the question of pointer alignment for
                # the last 2 wrapper functions no big thing.
-               push(@names, qw/junk ioctl/);
-#              push(@names, qw/junk ioctl poll/);
+               push(@names, qw/junk ioctl poll/);
        }
        my @subs = map {undef} @names;
        my $tmp = 0;
@@ -278,6 +280,39 @@ Indicates the Fuse version in use; more accurately, indicates the version
 of the Fuse API in use at build time. Returned as a decimal value; i.e.,
 for Fuse API v2.6, will return "2.6".
 
+=head3 Fuse::notify_poll
+
+Only available if the Fuse module is built against libfuse 2.8 or later.
+Use fuse_version() to determine if this is the case. Calling this function
+with a pollhandle argument (as provided to the C<poll> operation
+implementation) will send a notification to the caller poll()ing for
+I/O operation availability. If more than one pollhandle is provided for
+the same filehandle, only use the latest; you *can* send notifications
+to them all, but it is unnecessary and decreases performance.
+
+ONLY supply poll handles fed to you through C<poll> to this function.
+Due to thread safety requirements, we can't currently package the pointer
+up in an object the way we'd like to to prevent this situation, but your
+filesystem server program may segfault, or worse, if you feed things to
+this function which it is not supposed to receive. If you do anyway, we
+take no responsibility for whatever Bad Things(tm) may happen.
+
+=head3 Fuse::pollhandle_destroy
+
+Only available if the Fuse module is built against libfuse 2.8 or later.
+Use fuse_version() to determine if this is the case. This function destroys
+a poll handle (fed to your program through C<poll>). When you are done
+with a poll handle, either because it has been replaced, or because a
+notification has been sent to it, pass it to this function to dispose of
+it safely.
+
+ONLY supply poll handles fed to you through C<poll> to this function.
+Due to thread safety requirements, we can't currently package the pointer
+up in an object the way we'd like to to prevent this situation, but your
+filesystem server program may segfault, or worse, if you feed things to
+this function which it is not supposed to receive. If you do anyway, we
+take no responsibility for whatever Bad Things(tm) may happen.
+
 =head2 FUNCTIONS YOUR FILESYSTEM MAY IMPLEMENT
 
 =head3 getattr
@@ -692,6 +727,25 @@ Keep in mind that read and write are from the client perspective, so
 read from our end means data is going *out*, and write means data is
 coming *in*. It can be slightly confusing.
 
+=head1 poll
+
+Arguments: Pathname, poll handle ID (or undef if none), event mask, (optional) file handle
+
+Returns errno or 0 on success, and updated event mask on success
+
+Used to handle poll() operations on files. See poll(2) to learn more about
+event polling. Use IO::Poll to get the POLLIN, POLLOUT, and other symbols
+to describe the events which can happen on the filehandle. Save the poll
+handle ID to be passed to C<notify_poll> and C<pollhandle_destroy>
+functions, if it is not undef. Threading will likely be necessary for this
+operation to work.
+
+There is not an "out of band" data transfer channel provided as part of
+FUSE, so POLLPRI/POLLRDBAND/POLLWRBAND won't work.
+
+Poll handle is currently a read-only scalar; we are investigating a way
+to make this an object instead.
+
 =head1 AUTHOR
 
 Mark Glines, E<lt>mark@glines.orgE<gt>
diff --git a/Fuse.xs b/Fuse.xs
index 26b80e0..639adf8 100755 (executable)
--- a/Fuse.xs
+++ b/Fuse.xs
@@ -33,8 +33,7 @@
 
 #define MY_CXT_KEY "Fuse::_guts" XS_VERSION
 #if FUSE_VERSION >= 28
-# define N_CALLBACKS 40
-/* # define N_CALLBACKS 41 */
+# define N_CALLBACKS 41
 #elif FUSE_VERSION >= 26
 # define N_CALLBACKS 38
 #elif FUSE_VERSION >= 25
@@ -130,8 +129,10 @@ void S_fh_store_handle(pTHX_ pMY_CXT_ struct fuse_file_info *fi, SV *sv) {
                        SvSHARE(sv);
                }
 #endif
-               MAGIC *mg = (SvTYPE(sv) == SVt_PVMG) ? mg_find(sv, PERL_MAGIC_shared_scalar) : NULL;
-               fi->fh = mg ? PTR2IV(mg->mg_ptr) : PTR2IV(sv);
+        /* This seems to be screwing things up... */
+               // MAGIC *mg = (SvTYPE(sv) == SVt_PVMG) ? mg_find(sv, PERL_MAGIC_shared_scalar) : NULL;
+               // fi->fh = mg ? PTR2IV(mg->mg_ptr) : PTR2IV(sv);
+               fi->fh = PTR2IV(sv);
                if(hv_store_ent(MY_CXT.handles, FH_KEY(fi), SvREFCNT_inc(sv), 0) == NULL) {
                        SvREFCNT_dec(sv);
                }
@@ -1478,12 +1479,42 @@ int _PLfuse_ioctl(const char *file, int cmd, void *arg,
        return rv;
 }
 
-#if 0
 int _PLfuse_poll(const char *file, struct fuse_file_info *fi,
                  struct fuse_pollhandle *ph, unsigned *reventsp) {
-
+       int rv;
+       SV *sv = NULL;
+       FUSE_CONTEXT_PRE;
+       DEBUGf("poll begin\n");
+       ENTER;
+       SAVETMPS;
+       PUSHMARK(SP);
+       XPUSHs(sv_2mortal(newSVpv(file,0)));
+       if (ph) {
+        /* Still gotta figure out how to do this right... */
+               sv = newSViv(PTR2IV(ph));
+        SvREADONLY_on(sv);
+               SvSHARE(sv);
+               XPUSHs(sv);
+       }
+       else
+               XPUSHs(&PL_sv_undef);
+       XPUSHs(sv_2mortal(newSViv(*reventsp)));
+       XPUSHs(FH_GETHANDLE(fi));
+       PUTBACK;
+       rv = call_sv(MY_CXT.callback[40],G_ARRAY);
+       SPAGAIN;
+       if (rv > 1) {
+               *reventsp = POPi;
+        rv--;
+    }
+       rv = (rv ? POPi : 0);
+       FREETMPS;
+       LEAVE;
+       PUTBACK;
+       DEBUGf("poll end: %i\n", rv);
+       FUSE_CONTEXT_POST;
+       return rv;
 }
-#endif
 #endif /* FUSE_VERSION >= 28 */
 
 struct fuse_operations _available_ops = {
@@ -1533,9 +1564,7 @@ bmap:                     _PLfuse_bmap,
 #endif /* FUSE_VERSION >= 26 */
 #if FUSE_VERSION >= 28
 ioctl:                 _PLfuse_ioctl,
-#if 0
 poll:                  _PLfuse_poll,
-#endif
 #endif /* FUSE_VERSION >= 28 */
 };
 
@@ -1722,3 +1751,35 @@ perl_fuse_main(...)
                fuse_loop(fuse_new(fc,&args,&fops,sizeof(fops),NULL));
        fuse_unmount(mountpoint,fc);
        fuse_opt_free_args(&args);
+
+#if FUSE_VERSION >= 28
+
+void
+pollhandle_destroy(...)
+    PREINIT:
+       struct fuse_pollhandle *ph;
+    INIT:
+        if (items != 1) {
+            fprintf(stderr, "No pollhandle passed?\n");
+            XSRETURN_UNDEF;
+        }
+       CODE:
+        ph = INT2PTR(struct fuse_pollhandle*, SvIV(ST(0)));
+               fuse_pollhandle_destroy(ph);
+
+int 
+notify_poll(...)
+    PREINIT:
+        struct fuse_pollhandle *ph;
+    INIT:
+        if (items != 1) {
+            fprintf(stderr, "No pollhandle passed?\n");
+            XSRETURN_UNDEF;
+        }
+       CODE:
+        ph = INT2PTR(struct fuse_pollhandle*, SvIV(ST(0)));
+               RETVAL = fuse_notify_poll(ph);
+       OUTPUT:
+               RETVAL
+
+#endif
diff --git a/examples/fsel.pl b/examples/fsel.pl
new file mode 100755 (executable)
index 0000000..7ee7e2b
--- /dev/null
@@ -0,0 +1,199 @@
+#!/usr/bin/env perl
+
+use strict;
+no strict qw(refs);
+
+use threads;
+use threads::shared;
+
+use Carp;
+local $SIG{'__WARN__'} = \&Carp::cluck;
+
+use Fuse qw(:all);
+use Fcntl qw(:mode);
+use POSIX;
+use IO::Poll qw(POLLIN);
+use Time::HiRes qw(sleep);
+use Data::Dumper;
+
+use constant FSEL_CNT_MAX   => 10;
+use constant FSEL_FILES     => 16;
+
+my $fsel_open_mask :shared = 0;
+my $fsel_poll_notify_mask :shared = 0;
+my @fsel_poll_handle :shared;
+my @fsel_cnt :shared;
+map { $fsel_cnt[$_] = 0 } (0 .. (FSEL_FILES - 1));
+
+my $fsel_mutex :shared;
+
+sub fsel_path_index {
+    my ($path) = @_;
+    print 'called ', (caller(0))[3], "\n";
+
+    my $ch = substr($path, 1, 1);
+    if (length($path) != 2 || substr($path, 0, 1) ne '/' || 
+            $ch !~ /^[0-9A-F]$/) {
+        return -1;
+    }
+    return hex($ch);
+}
+
+sub fsel_getattr {
+    my ($path) = @_;
+    print 'called ', (caller(0))[3], "\n";
+    my @stbuf = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+    if ($path eq '/') {
+        $stbuf[2] = S_IFDIR | 0555;
+        $stbuf[3] = 2;
+        return @stbuf;
+    }
+
+    my $idx = fsel_path_index($path);
+    return -&ENOENT if $idx < 0;
+
+    $stbuf[2] = S_IFREG | 0444;
+    $stbuf[3] = 1;
+    $stbuf[7] = $fsel_cnt[$idx];
+    return @stbuf;
+}
+
+sub fsel_readdir {
+    my ($path, $offset) = @_;
+    print 'called ', (caller(0))[3], "\n";
+
+    return -&ENOENT if $path ne '/';
+
+    return('.', '..', map { sprintf('%X', $_) } (0 .. (FSEL_FILES - 1)), 0);
+}
+
+sub fsel_open {
+    my ($path, $flags, $info) = @_;
+    print 'called ', (caller(0))[3], "\n";
+
+    my $idx = fsel_path_index($path);
+    return -&ENOENT if $idx < 0;
+    return -&EACCES if $flags & 3 != O_RDONLY;
+    return -&EBUSY if $fsel_open_mask & (1 << $idx);
+    $fsel_open_mask |= (1 << $idx);
+
+    $info->{'direct_io'} = 1;
+    $info->{'nonseekable'} = 1;
+    print "fsel_open(): ", $idx, "\n";
+    my $foo = [ $idx + 0 ];
+    return (0, $foo->[0]);
+}
+
+sub fsel_release {
+    my ($path, $flags, $fh) = @_;
+    print 'called ', (caller(0))[3], "\n";
+
+    print "fsel_release(): \$fh is $fh\n";
+    $fsel_open_mask &= ~(1 << $fh);
+    printf("fsel_release(): \$fsel_open_mask is \%x\n", $fsel_open_mask);
+    return 0;
+}
+
+sub fsel_read {
+    my ($path, $size, $offset, $fh) = @_;
+    print 'called ', (caller(0))[3], "\n";
+    ## HACK
+    #$fh = fsel_path_index($path);
+    lock($fsel_mutex);
+
+    if ($fsel_cnt[$fh] < $size) {
+        $size = $fsel_cnt[$fh];
+    }
+    printf("READ   \%X transferred=\%u cnt=\%u\n", $fh, $size, $fsel_cnt[$fh]);
+    $fsel_cnt[$fh] -= $size;
+
+    return(chr($fh) x $size);
+}
+
+our $polled_zero :shared = 0;
+
+sub fsel_poll {
+    my ($path, $ph, $revents, $fh) = @_;
+    print 'called ', (caller(0))[3], "\n";
+    ## HACK
+    #$fh = fsel_path_index($path);
+
+    lock($fsel_mutex);
+
+    if ($ph) {
+        my $oldph = $fsel_poll_handle[$fh];
+        if ($oldph) {
+            pollhandle_destroy($oldph);
+        }
+        $fsel_poll_notify_mask |= (1 << $fh);
+        $fsel_poll_handle[$fh] = $ph;
+    }
+
+    if ($fsel_cnt[$fh]) {
+        $revents |= POLLIN;
+        printf("POLL   \%X cnt=\%u polled_zero=\%u\n", $fh, $fsel_cnt[$fh],
+                $polled_zero);
+        $polled_zero = 0;
+    }
+    else {
+        $polled_zero++;
+    }
+
+    return(0, $revents);
+}
+
+sub fsel_producer {
+    local $SIG{'KILL'} = sub { threads->exit(); };
+    print 'called ', (caller(0))[3], "\n";
+    my $tv = 0.25;
+    my $idx = 0;
+    my $nr = 1;
+
+    while (1) {
+        {
+            my ($i, $t);
+            lock($fsel_mutex);
+
+            for (($i, $t) = (0, $idx); $i < $nr; $i++,
+                    $t = (($t + int(FSEL_FILES / $nr)) % FSEL_FILES)) {
+                next if $fsel_cnt[$t] == FSEL_CNT_MAX;
+
+                $fsel_cnt[$t]++;
+                if ($fsel_poll_notify_mask & (1 << $t)) {
+                    printf("NOTIFY \%X\n", $t);
+                    my $ph = $fsel_poll_handle[$t];
+                    notify_poll($ph);
+                    pollhandle_destroy($ph);
+                    $fsel_poll_handle[$t] = undef;
+                    $fsel_poll_notify_mask &= ~(1 << $t);
+                }
+            }
+
+            $idx = ($idx + 1) % FSEL_FILES;
+            if ($idx == 0) {
+                $nr = ($nr * 2) % 7;
+            }
+        }
+
+        sleep($tv);
+    }
+}
+
+croak("Fuse doesn't have poll") unless Fuse::fuse_version() >= 2.8;
+
+my $thread = threads->create(\&fsel_producer);
+
+Fuse::main(
+    'mountpoint' => $ARGV[0],
+    'getattr'   => 'main::fsel_getattr',
+    'readdir'   => 'main::fsel_readdir',
+    'open'      => 'main::fsel_open',
+    'release'   => 'main::fsel_release',
+    'read'      => 'main::fsel_read',
+    'poll'      => 'main::fsel_poll',
+    'threaded'  => 1,
+);
+
+$thread->kill('KILL');
+$thread->join();
diff --git a/examples/fselclient.pl b/examples/fselclient.pl
new file mode 100755 (executable)
index 0000000..17ed1db
--- /dev/null
@@ -0,0 +1,41 @@
+#!/usr/bin/env perl
+
+use strict;
+no strict qw(refs);
+
+use Carp;
+local $SIG{'__WARN__'} = \&Carp::cluck;
+
+use IO::Poll qw(POLLIN);
+use Fcntl;
+use constant FSEL_FILES => 16;
+
+my @fds;
+
+foreach my $i (0 .. (FSEL_FILES - 1)) {
+    sysopen($fds[$i], $ARGV[0] . '/' . sprintf('%X', $i), O_RDONLY)
+        or croak($!);
+}
+
+my $poll = new IO::Poll;
+foreach my $fd (@fds) {
+    $poll->mask($fd, POLLIN);
+}
+while (1) {
+    my $rc = $poll->poll();
+
+    croak($!) if $rc < 0;
+
+    foreach my $i (0 .. (FSEL_FILES - 1)) {
+        if (!$poll->events($fds[$i])) {
+            print '_:   ';
+            next;
+        }
+        printf('%X:', $i);
+        $rc = sysread($fds[$i], my $buf, 4096);
+        croak($!) if !defined($rc);
+
+        printf('%02d ', $rc);
+    }
+    print "\n";
+}