[AF_RXRPC]: Add an interface to the AF_RXRPC module for the AFS filesystem to use
[powerpc.git] / net / rxrpc / af_rxrpc.c
index bfa8822..2c57df9 100644 (file)
@@ -41,6 +41,8 @@ atomic_t rxrpc_debug_id;
 /* count of skbs currently in use */
 atomic_t rxrpc_n_skbs;
 
+struct workqueue_struct *rxrpc_workqueue;
+
 static void rxrpc_sock_destructor(struct sock *);
 
 /*
@@ -214,7 +216,8 @@ static int rxrpc_listen(struct socket *sock, int backlog)
  */
 static struct rxrpc_transport *rxrpc_name_to_transport(struct socket *sock,
                                                       struct sockaddr *addr,
-                                                      int addr_len, int flags)
+                                                      int addr_len, int flags,
+                                                      gfp_t gfp)
 {
        struct sockaddr_rxrpc *srx = (struct sockaddr_rxrpc *) addr;
        struct rxrpc_transport *trans;
@@ -232,17 +235,129 @@ static struct rxrpc_transport *rxrpc_name_to_transport(struct socket *sock,
                return ERR_PTR(-EAFNOSUPPORT);
 
        /* find a remote transport endpoint from the local one */
-       peer = rxrpc_get_peer(srx, GFP_KERNEL);
+       peer = rxrpc_get_peer(srx, gfp);
        if (IS_ERR(peer))
                return ERR_PTR(PTR_ERR(peer));
 
        /* find a transport */
-       trans = rxrpc_get_transport(rx->local, peer, GFP_KERNEL);
+       trans = rxrpc_get_transport(rx->local, peer, gfp);
        rxrpc_put_peer(peer);
        _leave(" = %p", trans);
        return trans;
 }
 
+/**
+ * rxrpc_kernel_begin_call - Allow a kernel service to begin a call
+ * @sock: The socket on which to make the call
+ * @srx: The address of the peer to contact (defaults to socket setting)
+ * @key: The security context to use (defaults to socket setting)
+ * @user_call_ID: The ID to use
+ *
+ * Allow a kernel service to begin a call on the nominated socket.  This just
+ * sets up all the internal tracking structures and allocates connection and
+ * call IDs as appropriate.  The call to be used is returned.
+ *
+ * The default socket destination address and security may be overridden by
+ * supplying @srx and @key.
+ */
+struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
+                                          struct sockaddr_rxrpc *srx,
+                                          struct key *key,
+                                          unsigned long user_call_ID,
+                                          gfp_t gfp)
+{
+       struct rxrpc_conn_bundle *bundle;
+       struct rxrpc_transport *trans;
+       struct rxrpc_call *call;
+       struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
+       __be16 service_id;
+
+       _enter(",,%x,%lx", key_serial(key), user_call_ID);
+
+       lock_sock(&rx->sk);
+
+       if (srx) {
+               trans = rxrpc_name_to_transport(sock, (struct sockaddr *) srx,
+                                               sizeof(*srx), 0, gfp);
+               if (IS_ERR(trans)) {
+                       call = ERR_PTR(PTR_ERR(trans));
+                       trans = NULL;
+                       goto out;
+               }
+       } else {
+               trans = rx->trans;
+               if (!trans) {
+                       call = ERR_PTR(-ENOTCONN);
+                       goto out;
+               }
+               atomic_inc(&trans->usage);
+       }
+
+       service_id = rx->service_id;
+       if (srx)
+               service_id = htons(srx->srx_service);
+
+       if (!key)
+               key = rx->key;
+       if (key && !key->payload.data)
+               key = NULL; /* a no-security key */
+
+       bundle = rxrpc_get_bundle(rx, trans, key, service_id, gfp);
+       if (IS_ERR(bundle)) {
+               call = ERR_PTR(PTR_ERR(bundle));
+               goto out;
+       }
+
+       call = rxrpc_get_client_call(rx, trans, bundle, user_call_ID, true,
+                                    gfp);
+       rxrpc_put_bundle(trans, bundle);
+out:
+       rxrpc_put_transport(trans);
+       release_sock(&rx->sk);
+       _leave(" = %p", call);
+       return call;
+}
+
+EXPORT_SYMBOL(rxrpc_kernel_begin_call);
+
+/**
+ * rxrpc_kernel_end_call - Allow a kernel service to end a call it was using
+ * @call: The call to end
+ *
+ * Allow a kernel service to end a call it was using.  The call must be
+ * complete before this is called (the call should be aborted if necessary).
+ */
+void rxrpc_kernel_end_call(struct rxrpc_call *call)
+{
+       _enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
+       rxrpc_remove_user_ID(call->socket, call);
+       rxrpc_put_call(call);
+}
+
+EXPORT_SYMBOL(rxrpc_kernel_end_call);
+
+/**
+ * rxrpc_kernel_intercept_rx_messages - Intercept received RxRPC messages
+ * @sock: The socket to intercept received messages on
+ * @interceptor: The function to pass the messages to
+ *
+ * Allow a kernel service to intercept messages heading for the Rx queue on an
+ * RxRPC socket.  They get passed to the specified function instead.
+ * @interceptor should free the socket buffers it is given.  @interceptor is
+ * called with the socket receive queue spinlock held and softirqs disabled -
+ * this ensures that the messages will be delivered in the right order.
+ */
+void rxrpc_kernel_intercept_rx_messages(struct socket *sock,
+                                       rxrpc_interceptor_t interceptor)
+{
+       struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
+
+       _enter("");
+       rx->interceptor = interceptor;
+}
+
+EXPORT_SYMBOL(rxrpc_kernel_intercept_rx_messages);
+
 /*
  * connect an RxRPC socket
  * - this just targets it at a specific destination; no actual connection
@@ -294,7 +409,8 @@ static int rxrpc_connect(struct socket *sock, struct sockaddr *addr,
                return -EBUSY; /* server sockets can't connect as well */
        }
 
-       trans = rxrpc_name_to_transport(sock, addr, addr_len, flags);
+       trans = rxrpc_name_to_transport(sock, addr, addr_len, flags,
+                                       GFP_KERNEL);
        if (IS_ERR(trans)) {
                release_sock(&rx->sk);
                _leave(" = %ld", PTR_ERR(trans));
@@ -344,7 +460,7 @@ static int rxrpc_sendmsg(struct kiocb *iocb, struct socket *sock,
        if (m->msg_name) {
                ret = -EISCONN;
                trans = rxrpc_name_to_transport(sock, m->msg_name,
-                                               m->msg_namelen, 0);
+                                               m->msg_namelen, 0, GFP_KERNEL);
                if (IS_ERR(trans)) {
                        ret = PTR_ERR(trans);
                        trans = NULL;
@@ -576,7 +692,7 @@ static int rxrpc_release_sock(struct sock *sk)
 
        /* try to flush out this socket */
        rxrpc_release_calls_on_socket(rx);
-       flush_scheduled_work();
+       flush_workqueue(rxrpc_workqueue);
        rxrpc_purge_queue(&sk->sk_receive_queue);
 
        if (rx->conn) {
@@ -673,15 +789,21 @@ static int __init af_rxrpc_init(void)
 
        rxrpc_epoch = htonl(xtime.tv_sec);
 
+       ret = -ENOMEM;
        rxrpc_call_jar = kmem_cache_create(
                "rxrpc_call_jar", sizeof(struct rxrpc_call), 0,
                SLAB_HWCACHE_ALIGN, NULL, NULL);
        if (!rxrpc_call_jar) {
                printk(KERN_NOTICE "RxRPC: Failed to allocate call jar\n");
-               ret = -ENOMEM;
                goto error_call_jar;
        }
 
+       rxrpc_workqueue = create_workqueue("krxrpcd");
+       if (!rxrpc_workqueue) {
+               printk(KERN_NOTICE "RxRPC: Failed to allocate work queue\n");
+               goto error_work_queue;
+       }
+
        ret = proto_register(&rxrpc_proto, 1);
         if (ret < 0) {
                 printk(KERN_CRIT "RxRPC: Cannot register protocol\n");
@@ -719,6 +841,8 @@ error_key_type:
 error_sock:
        proto_unregister(&rxrpc_proto);
 error_proto:
+       destroy_workqueue(rxrpc_workqueue);
+error_work_queue:
        kmem_cache_destroy(rxrpc_call_jar);
 error_call_jar:
        return ret;
@@ -743,9 +867,10 @@ static void __exit af_rxrpc_exit(void)
        ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0);
 
        _debug("flush scheduled work");
-       flush_scheduled_work();
+       flush_workqueue(rxrpc_workqueue);
        proc_net_remove("rxrpc_conns");
        proc_net_remove("rxrpc_calls");
+       destroy_workqueue(rxrpc_workqueue);
        kmem_cache_destroy(rxrpc_call_jar);
        _leave("");
 }