+++ /dev/null
-/*
- openisis - an open implementation of the CDS/ISIS database
- Version 0.8.x (patchlevel see file Version)
- Copyright (C) 2001-2003 by Erik Grziwotz, erik@openisis.org
-
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
- see README for more information
-EOH */
-
-/*
- $Id: lsv.c,v 1.14 2003/06/11 14:53:26 kripke Exp $
- OpenIsis server
-*/
-
-#include <sys/select.h>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <netinet/in.h>
-#include <string.h> /* memcpy et al */
-#include <signal.h>
-#include <errno.h>
-
-
-#include "lsv.h"
-
-
-
-static Con *con[FD_SETSIZE]; /* usually 1024, but up to 1<<16 */
-#ifdef LSVTHR
-static pthread_mutex_t lsv_mutex_init = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t lsv_cond_init = PTHREAD_COND_INITIALIZER;
-static pthread_key_t lsv_key_wrk;
-#else
-static Wrk *lsv_wrk_single;
-#endif
-static int lsv_term;
-static siginfo_t lsv_sig;
-
-static const char * pdg[] = {
- "---",
- "NEW",
- "RED",
- "EOF"
-};
-static const char * stg[] = {
- "NEW",
- "CON",
- "INP",
- "SES",
- "WRK",
- "RUN",
- "COM",
- "DON",
- "RET"
-};
-
-static void lsv_sighand ( int sig, siginfo_t *info, void *ucontext )
-{
- if ( SIGINT == sig || SIGTERM == sig )
- lsv_term = !0;
- if ( info )
- lsv_sig = *info;
- else { /* ??? */
- lsv_sig.si_signo = sig;
- lsv_sig.si_pid = 0;
- }
- (void)ucontext;
-} /* lsv_sighand */
-
-
-static int lsv_enq ( Que *q, Con *c )
-{
- c->nxt = 0;
- q->len++;
- if ( q->tail ) {
- q->tail->nxt = c;
- q->tail = c;
- return 1;
- }
- q->head = q->tail = c;
- return 0;
-} /* lsv_enq */
-
-
-static void lsv_penq ( Srv *srv, Con *c )
-{
- Pool *work =
-#ifdef LSVTHR
- c->grp ? &srv->pool :
-#endif
- &srv->main;
- srv->jobs++;
- c->stg = LSV_STGWRK;
- if ( c->ses )
- c->ses->cur = c;
- lsv_enq( &work->que, c );
- sMsg( LOG_DEBUG, "enq req %d from %s qlen %d",
- LIO_FD&c->ios.file, c->nam, work->que.len );
-#ifdef LSVTHR
- if ( work->wait ) {
- Wrk *w = work->wait;
- work->wait = w->nxt;
- w->nxt = 0;
- srv->plen--;
- pthread_cond_signal( &w->todo );
- sMsg( LOG_DEBUG, "woke %d of %d", w->id, srv->plen );
- }
-#endif
-} /* lsv_penq */
-
-
-static Con *lsv_deq ( Que *q )
-{
- Con *c = q->head;
- if ( c ) {
- q->len--;
- q->head = c->nxt;
- c->nxt = 0;
- if ( q->tail == c )
- q->tail = 0;
- }
- return c;
-} /* lsv_deq */
-
-
-Wrk *svCur ()
-{
-#ifdef LSVTHR
- return (Wrk*)pthread_getspecific( lsv_key_wrk );
-#else
- return lsv_wrk_single;
-#endif
-}
-
-
-/**
- in threaded mode, run forever, waiting for jobs.
- non-threaded, do the jobs that are available and return
-*/
-static void *lsv_wrk ( void *arg )
-{
- Wrk *self = (Wrk*)arg;
- Srv *srv = self->srv;
- Con *job = 0;
- Pool *work =
-#ifdef LSVTHR
- self->id ? &srv->pool :
-#endif
- &srv->main;
-#ifdef LSVTHR
- sigset_t blk;
-
- /* block signals, they should go to the receiver */
- sigemptyset( &blk );
- sigaddset( &blk, SIGHUP );
- sigaddset( &blk, SIGINT );
- sigaddset( &blk, SIGTERM );
- sigaddset( &blk, SIGPIPE );
- pthread_sigmask( SIG_BLOCK, &blk, 0 );
-
- pthread_setspecific( lsv_key_wrk, arg );
- if ( self->id )
- srv->app( 0, LSV_APPINI );
-#else
- lsv_wrk_single = self;
- if ( !self->ses )
-#endif
- self->ses = sGet(); /* allocate session for thread */
-
- for (;/*ever*/;) {
- int ret;
-
- /* ****************************************
- critical section enter
- */
-#ifdef LSVTHR
- pthread_mutex_lock( &srv->mut );
-#endif
- if ( job ) { /* from last turn */
- sMsg( LOG_DEBUG, "thr %d enq job %s pdg %s",
- self->id, job->nam, pdg[job->pdg] );
- job->stg = LSV_STGRET;
- lsv_enq( &srv->recv, job );
-#ifdef LSVTHR
- if ( job->pdg ) /* wakeup receiver */
- write( srv->pip[1], &job->pdg, 1 );
-#endif
- if ( job->ses->que ) { /* immediatly reschedule queued job */
- Con *q = job->ses->que;
- job->ses->que = q->nxt;
- sMsg( LOG_VERBOSE, "dequeing con %s for ses '%s'",
- q->nam, job->ses->name );
- lsv_penq( srv, q );
- }
- }
- self->cur = 0;
- while ( !(job = lsv_deq( &work->que ))
- && !(LSV_SHUTDN & srv->flg)
- ) {
-#ifndef LSVTHR
- return self;
-#else
- self->nxt = work->wait;
- work->wait = self; /* pls pls lemme wrk */
- srv->plen++;
- self->waits++;
- sMsg( LOG_DEBUG, "thr %d waiting", self->id );
- /* wait releases the mutex */
- pthread_cond_wait( &self->todo, &srv->mut );
-#endif
- }
- self->cur = job;
- if ( job )
- job->stg = LSV_STGRUN;
-#ifdef LSVTHR
- pthread_mutex_unlock( &srv->mut );
-#endif
- /* ****************************************
- critical section leave
- */
-
- /* end of monday morning meeting blah blah -- start working */
- if ( ! job || (LSV_SHUTDN & srv->flg) )
- break;
- self->jobs++;
- if ( job->ses )
- sSet( job->ses );
- else
- job->ses = self->ses;
- job->ses->io[1] = &job->ios;
- if ( job->ses->res )
- CLRREC( job->ses->res );
- sMsg( LOG_DEBUG, "thr %d run job %s ses '%s'.%d",
- self->id, job->nam, job->ses->name, job->ses->accnt );
- ret = srv->app( job, LSV_APPRUN );
- sMsg( LOG_DEBUG, "thr %d got %d buf %d",
- self->id, ret, LIO_SAVAIL( &job->ios ) );
-
- /*
- lock only in order to create SMP memory barrier.
- on a single processor system, the byte should be safely readable
- by receiver without this lock.
- TOD: check alternatives:
- since receiver is checking the stage of active connections
- only in rare cases, it might be more efficient to guard either
- this set or the processing up to here with a per thread mutex.
- */
-#ifdef LSVTHR
- pthread_mutex_lock( &srv->mut );
-#endif
- job->stg = LSV_STGDON;
- job->ses->cur = 0;
-#ifdef LSVTHR
- pthread_mutex_unlock( &srv->mut );
-#endif
-
- ret = srv->prt( &job->ios, LIO_SCLOSE );
- job->ses->io[1] = 0;
- sMsg( LOG_DEBUG, "thr %d done job %s", self->id, job->nam );
- if ( job->ses != self->ses )
- sSet( self->ses ); /* reset session */
- }
-#ifndef LSVTHR
- if ( self->id )
- srv->app( 0, LSV_APPFIN );
-#endif
- return self;
-} /* lsv_wrk */
-
-
-static void *lsv_rcv ( void *arg )
-{
- Srv *srv = (Srv*)arg;
- fd_set fd;
- int fdlen = 0;
- int pending = 0;
- int ret = 0, sel, i;
-#ifdef LSVTHR
- int inlock = 0;
-#endif
-
- FD_ZERO( &fd );
- FD_SET( srv->lsn, &fd );
- fdlen = srv->lsn+1;
-#ifdef LSVTHR
- FD_SET( srv->pip[0], &fd );
- if ( fdlen < srv->pip[0]+1 )
- fdlen = srv->pip[0]+1;
-#endif
-
- for (;/*ever*/;) {
- struct sockaddr_in npeer; /* of new connection */
- int nsock = -1; /* new socket */
- Con *c;
- fd_set ready = fd;
- Tm expire;
-
- /* select on read only; writing is done in workers and
- exceptions (i.e. telnet OOB data) are not used
- */
-#ifndef LSVTHR
- if ( srv->recv.len ) { /* instead of listening on pip */
- memset( &ready, 0, sizeof(ready) );
- sel = 0;
- } else
-#endif
- if ( 0 > (sel = select( fdlen, &ready, 0, 0, 0 )) ) {
- if ( EINTR == errno ) {
- sMsg( LOG_DEBUG, "select woken by signal" );
- } else {
- ret = sMsg( LOG_SYSERR, "select" );
- goto done;
- }
- }
- if ( lsv_sig.si_signo ) {
- sMsg( LOG_DEBUG, "SIG%s (%d) from pid %d",
- SIGHUP==lsv_sig.si_signo ? "HUP" :
- SIGINT==lsv_sig.si_signo ? "INT" :
- SIGTERM==lsv_sig.si_signo ? "TERM" :
- SIGPIPE==lsv_sig.si_signo ? "PIPE" : "?",
- lsv_sig.si_signo, lsv_sig.si_pid );
- if ( 1 /*SIGHUP==lsv_sig.si_signo*/ ) {
- for ( i=FD_SETSIZE; i--; )
- if ( con[i] ) {
- c = con[i];
- sMsg( LOG_INFO,
- "con %3d peer %s stg %s prt %d app %d ios %d %s (0x%x)",
- i, c->nam, stg[c->stg], c->prt, c->app,
- c->ios.pos + c->ios.b.done,
- LIO_SISOPEN(&c->ios) ? "opn" : "clo", c->ios.file );
- }
- }
- if ( lsv_term ) /* even if it wasn't the last signal delivered */
- goto done;
- lsv_sig.si_signo = 0;
- /* must NOT access the set after interrupt,
- probably all fds are set
- */
- continue;
- }
- timeUpd( &srv->tim );
- timeGtfm( srv->gtm, &srv->tim );
- expire.millis = srv->tim.millis - srv->sto*LLL(1000);
-
-#ifdef LSVTHR
- pthread_mutex_lock( &srv->mut ); /* back in locked land */
- inlock = !0;
-#endif
-
- /* -- we're the acceptor */
- if ( FD_ISSET( srv->lsn, &ready ) ) { /* new connection */
- unsigned /*socklen_t is broken*/ alen = sizeof(npeer);
- nsock = accept( srv->lsn, (struct sockaddr*)&npeer, &alen );
- if ( 0 > nsock ) {
- ret = sMsg( LOG_SYSERR, "accept" );
- goto done;
- }
- if ( FD_SETSIZE <= nsock ) {
- ret = sMsg( ERR_BADF,
- "socket %d >= FD_SETSIZE %d", nsock, FD_SETSIZE );
- goto done;
- }
- if ( sizeof(npeer) != alen ) {
- ret = sMsg( ERR_INVAL, "bad peer len %d", alen );
- goto done;
- }
- /* ok */
- /* setsockopt
- SO_SNDTIMEO fixed value on linux
- SO_OOBINLINE ... hmm, don't care
- SO_LINGER off
- */
- /* prepare con */
- if ( !(c = con[nsock]) ) {
- c = con[nsock] = mAlloc( sizeof(*c) );
- c->ios.file = nsock; /* preliminary -- further setup below */
- }
- c->pdg = LSV_PDGNEW;
- switch ( c->stg ) {
- case LSV_STGCON: /* still in control of receiver */
- case LSV_STGINP:
- sMsg( ERR_IDIOT, "bad recv stage %d on new con", c->stg );
- c->stg = LSV_STGNEW;
- case LSV_STGNEW: /* clean idle socket -- pass to receiver */
- lsv_enq( &srv->recv, c );
- break;
- case LSV_STGSES: /* still in control of receiver */
- case LSV_STGWRK:
- sMsg( ERR_IDIOT, "bad worker stage %d on new con", c->stg );
- case LSV_STGRUN: /* worker will queue it */
- /* hmmm ... should have COM ... */
- case LSV_STGCOM:
- case LSV_STGDON:
- case LSV_STGRET: /* worker has queued it */
- pending++;
- break;
- }
- srv->conn++;
- FD_CLR( srv->lsn, &ready ); /* skip in fd check */
- } /* new connection */
-
-#ifdef LSVTHR
- if ( FD_ISSET( srv->pip[0], &ready ) ) { /* pending events */
- char trash[LSV_NUMWRK];
- read( srv->pip[0], trash, sizeof(trash) );
- FD_CLR( srv->pip[0], &ready );
- sel--;
- }
-#endif
-
- /* switching roles -- now we're the receiver */
- while ( (c = lsv_deq( &srv->recv )) ) { /* care for ready connections */
- int sock = LIO_FD & c->ios.file;
- sMsg( LOG_DEBUG, "deq %d from %s pdg %s",
- sock, c->nam, pdg[c->pdg] );
- if ( c->pdg && c->stg )
- pending--;
- if ( !(LIO_IN & c->ios.file) && (LIO_RDWR & c->ios.file) ) {
- /* cleanup closed socks */
- if ( c != con[sock] ) {
- ret = sMsg( ERR_IDIOT,
- "deq bad sock %d (0x%x)", sock, c->ios.file );
- goto done; /* PANIC time */
- }
- sMsg( LOG_DEBUG, "clos%s sock %d",
- (LIO_OUT & c->ios.file) ? "ing" : "ed", sock );
- if ( (LIO_OUT & c->ios.file) )
- lio_close( &c->ios.file, LIO_INOUT );
- c->ios.file &= ~LIO_RDWR; /* clear also, so we're not closing again */
- FD_CLR( sock, &fd );
- FD_CLR( sock, &ready ); /* probably an EOF to read */
- if ( fdlen == sock+1 )
- while ( fdlen
- /* && !FD_ISSET( fdlen-1, &fd ) risk of muting tmp. disabled */
- && (!con[fdlen-1] || !LIO_SISOPEN(&con[fdlen-1]->ios))
- )
- fdlen--;
- }
- if ( LSV_PDGNEW == c->pdg ) { /* new connection */
- struct sockaddr_in peer; /* of new connection */
- if ( nsock == sock ) /* the one accepted just above */
- peer = npeer;
- else {
- unsigned alen = sizeof(peer);
- if ( getpeername( sock, (struct sockaddr*)&peer, &alen )
- || sizeof(peer) != alen
- ) {
- ret = sMsg( LOG_SYSERR, "getpeername alen %d", alen );
- goto done;
- }
- }
- c->srv = srv;
- c->ses = 0;
- {
- unsigned host = (unsigned)ntohl( peer.sin_addr.s_addr );
- unsigned short port = (unsigned short)ntohs( peer.sin_port );
- char *p = c->nam;
- c->host = host;
- memset( c->nam, 0, sizeof(c->nam) );
- p += u2a( p, 0xff & (host >> 24) ); *p++ = '.';
- p += u2a( p, 0xff & (host >> 16) ); *p++ = '.';
- p += u2a( p, 0xff & (host >> 8) ); *p++ = '.';
- p += u2a( p, 0xff & (host ) ); *p++ = ':';
- p += u2a( p, port );
- /* *p++ = '@'; p += u2a( p, sock ); */
- }
- c->con++; /* never mind overflow ... */
- sMsg( LOG_INFO,
- "connect %d on %d from %s", c->con+1, sock, c->nam );
- LIO_SINIT( &c->ios, srv->prt, c->nam,
- sock | LIO_INOUT|LIO_RDWR|LIO_SOCK );
- LIO_BINIT( &c->flt );
- FD_SET( sock, &fd );
- if ( fdlen < sock+1 )
- fdlen = sock+1;
- c->bin = 0; /* do not reset on every request */
- }
- if ( ! (LIO_IN & c->ios.file) )
- c->stg = LSV_STGNEW;
- else {
- c->stg = LSV_STGCON;
- if ( ! FD_ISSET( sock, &fd ) ) {
- sMsg( LOG_DEBUG, "reenabling %s", c->nam );
- FD_SET( sock, &fd );
- if ( fdlen < sock+1 )
- fdlen = sock+1;
- }
- }
- c->grp = c->pdg = c->prt = c->app = 0;
- /* prepare for new request */
- c->ios.pos = 0;
- c->ios.b.done = c->ios.b.fill = 0;
- if ( c->req )
- CLRREC( c->req );
- else {
- /* this is done in the receiver rather than the acceptor
- since in multi-receiver model, receiver owns the req
- */
- c->req = mAlloc( LSV_BUFSIZ );
- OPENISIS_INITREC( c->req, LSV_BUFSIZ, 64 );
- }
- if ( !(LSV_CONSES & srv->flg) )
- c->ses = 0;
- } /* prepare connections */
-
-
- for ( i=fdlen; sel && i--; ) { /* read from the ready */
- Field *sid;
- int got;
- if ( ! FD_ISSET( i, &ready ) )
- continue;
- sel--;
- c = con[i];
- if ( ! c ) {
- ret = sMsg( ERR_IDIOT, "ran on empty con %d", i );
- goto done;
- }
- if ( LSV_STGINP < c->stg ) {
- sMsg( LOG_DEBUG, "con %d busy stage %d!", i, c->stg );
- c->pdg = LSV_PDGRED;
- pending++;
- FD_CLR( i, &fd ); /* mute this */
- continue;
- }
- c->ios.b.done = 0;
- got = lio_read( &c->ios.file, c->ios.b.c, sizeof(c->ios.b.c) );
- if ( 0 >= got ) {
- if ( -ERR_EOF != got )
- goto err;
- sMsg( LOG_DEBUG, "EOF on 0x%x line %d from %s",
- c->ios.file, c->req->len, c->nam );
- FD_CLR( i, &fd );
- if ( ! c->req->len ) /* initial eof */
- goto eof; /* close immediatly */
- /* lio_close( &c->ios.file, LIO_IN ); */
- c->ios.file &= ~LIO_IN; /* don't shut */
- c->pdg = LSV_PDGEOF; /* so it will be dequed and closed */
- pending++;
- got = 0; /* tell proto to close ... */
- }
- if ( ! c->req->len ) { /* stage should be CON */
- RADDS( c->req, -LSV_PEER, c->nam, 1 );
- sMsg( LOG_DEBUG, "req on %d from %s",
- LIO_FD&c->ios.file, c->nam );
- }
- c->stg = LSV_STGINP;
- c->ios.b.fill = got;
- got = srv->prt( &c->ios, LIO_SPUSH );
- sMsg( LOG_DEBUG, "got 0x%x from %s", got, c->nam );
- if ( ! got && (LIO_IN & c->ios.file) ) /* ok: more next time */
- continue;
- if ( 0 > got ) /* nok */
- goto err;
- got = c->srv->app( c, LSV_APPGOT );
- if ( 0 > got ) /* nok */
- goto err;
- if ( !(LSV_CONSES & srv->flg)
- && c->req && (sid = rGet( c->req, -LSV_SID, 0 ))
- ) {
- c->ses = cSesByName( (char*)sid->val, sid->len, &srv->tim, &expire );
- if ( ! c->ses ) { /* sad sad sad */
- sMsg( LOG_ERROR, "out of sessions" );
- goto err; /* drop connection */
- }
- if ( ! c->ses->accnt
- && (! c->ses->prop || ! c->ses->prop->len) /* overflow paranoia */
- )
- got = c->srv->app( c, LSV_APPSES );
- if ( 0 > got ) /* nok */
- goto err;
- }
- c->tim = srv->tim;
- RADDS( c->req, -LSV_TIME, srv->gtm, 1 );
- if ( c->ses ) {
- sMsg( LOG_DEBUG, "req %s for ses '%s' %s",
- c->nam, c->ses->name, c->ses->cur ? "busy" : "idle" );
- c->ses->accnt++;
- if ( c->ses->cur || c->ses->que ) { /* oh - busy !? */
- Con **cpp = &c->ses->que;
- int l = 0;
- for ( ; *cpp; cpp = &(*cpp)->nxt )
- if ( 4 < ++l ) {
- sMsg( LOG_WARN,
- "too many con queing on ses '%s' - dropping %s",
- c->ses->name, c->nam );
- goto err;
- }
- sMsg( LOG_VERBOSE, "queing con %s for ses '%s'",
- c->nam, c->ses->name );
- c->nxt = 0;
- *cpp = c;
- c->stg = LSV_STGSES;
- continue;
- }
- }
- /* yo - got it */
- lsv_penq( srv, c );
- continue;
- err:
- sMsg( ERR_BADF, "err 0x%x from %s", got, c->nam );
- eof:
- lio_close( &c->ios.file, LIO_INOUT );
- c->stg = LSV_STGNEW;
- FD_CLR( i, &fd );
- } /* read from the ready */
-
- srv->turn++;
- srv->busy += srv->nwr - srv->plen;
-#ifdef LSVTHR
- srv->wlen += srv->pool.que.len;
-
- sMsg( LOG_INFO,
- "\n==== %s\nturn %d: %d jobs for %d idle workers %d pending",
- srv->gtm, srv->turn, srv->pool.que.len, srv->plen, pending );
- for ( i=srv->nwr; i--; )
- if ( srv->wrk[i].cur )
- sMsg( LOG_INFO, "thr %d serving %d from %s",
- i, LIO_FD&srv->wrk[i].cur->ios.file, srv->wrk[i].cur->nam );
-
- inlock = 0;
- pthread_mutex_unlock( &srv->mut );
-#else
- lsv_wrk( srv->wrk );
-#endif
- } /* for(ever) */
-done:
-#ifdef LSVTHR
- if ( inlock )
- pthread_mutex_unlock( &srv->mut );
- srv->flg |= LSV_SHUTDN;
- for ( i=srv->nwr; i--; )
- pthread_cond_signal( &srv->wrk[i].todo );
-#endif
- return srv;
-} /* lsv_rcv */
-
-
-int svRun ( Srv *srv, const char *addr )
-{
- static const int yes = !0;
- struct sigaction siga;
- struct sockaddr_in sa; /* server address */
- int ret = 0, i;
-
- /* assume we're the main-thread
- -- try to make sure we get the controlling session
- */
- sGet();
-
- /* set defaults */
- if ( ! srv->sto )
- srv->sto = 30*60; /* 30 minutes session timeout */
-#ifndef LSVTHR
- srv->nwr = 1;
-#else
- if ( ! srv->nwr )
- srv->nwr = LSV_NUMWRK/2; /* half the max */
- else if ( LSV_NUMWRK < srv->nwr )
- srv->nwr = LSV_NUMWRK; /* max */
-
- srv->mut = lsv_mutex_init;
-#endif
- /* prepare signals */
- memset( &siga, 0, sizeof(siga) );
- siga.sa_sigaction = lsv_sighand;
- sigemptyset( &siga.sa_mask );
- sigaddset( &siga.sa_mask, SIGHUP );
- sigaddset( &siga.sa_mask, SIGINT );
- sigaddset( &siga.sa_mask, SIGTERM );
- sigaddset( &siga.sa_mask, SIGPIPE );
- siga.sa_flags = SA_SIGINFO;
- sigaction( SIGHUP, &siga, 0 );
- sigaction( SIGINT, &siga, 0 );
- sigaction( SIGTERM, &siga, 0 );
- sigaction( SIGPIPE, &siga, 0 );
-
- /* prepare socket */
- srv->lsn = socket( PF_INET, SOCK_STREAM, IPPROTO_TCP/*6*/ );
- /* fcntl( srv->lsn, O_NONBLOCK ); not needed */
- sa.sin_family = AF_INET;
- sa.sin_port = htons( addr ? a2i( addr, -1 ) : 8080 );
- sa.sin_addr.s_addr =
-#ifdef LSV_LOCALHOST_ONLY
- htonl(INADDR_LOOPBACK);
-#else
- INADDR_ANY;
-#endif
- if ( setsockopt( srv->lsn, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes) ) ) {
- ret = sMsg( LOG_SYSERR, "setsockopt" );
- goto inierr;
- }
- if ( bind( srv->lsn, (struct sockaddr*)&sa, sizeof(sa) ) ) {
- ret = sMsg( LOG_SYSERR, "bind on port %u", htons(sa.sin_port) );
- goto inierr;
- }
- if ( listen( srv->lsn, 64/*backlog*/ ) ) {
- ret = sMsg( LOG_SYSERR, "listen" );
- goto inierr;
- }
-#ifdef LSVTHR
- /* prepare pending event pipe */
- if ( pipe( srv->pip ) ) {
- ret = sMsg( LOG_SYSERR, "pipe" );
- goto inierr;
- }
-
- if ( pthread_key_create( &lsv_key_wrk, 0 ) ) {
- ret = sMsg( LOG_SYSERR, "key_create" );
- goto inierr;
- }
-#endif
- /* setup workers */
- memset( srv->wrk, 0, sizeof(srv->wrk) );
- for ( i=srv->nwr; i--; ) {
- srv->wrk[i].id = i;
- srv->wrk[i].srv = srv;
- srv->wrk[i].nxt = 0;
- srv->wrk[i].ses = cSession(0);
- memcpy( srv->wrk[i].ses->name, "wrk", 3 );
- u2a( srv->wrk[i].ses->name+3, i );
-#ifdef LSVTHR
- srv->wrk[i].todo = lsv_cond_init;
- if ( i )
- pthread_create( &srv->wrk[i].thr, 0, lsv_wrk, srv->wrk+i );
-#endif
- }
-#ifndef LSVTHR
- lsv_rcv( srv );
-#else
- /* start the server thread */
- pthread_create( &srv->rcv, 0, lsv_rcv, srv );
- /* run main worker */
- srv->wrk[0].thr = pthread_self();
- lsv_wrk( srv->wrk );
-
- /* cleanup */
- pthread_join( srv->rcv, 0 );
- for ( i=srv->nwr; i--; )
- pthread_join( srv->wrk[i].thr, 0 );
-#endif
- for ( i=srv->nwr; i--; )
- sMsg( LOG_INFO, "worker %d had %d jobs %d waits",
- i, srv->wrk[i].jobs, srv->wrk[i].waits );
- sMsg( LOG_INFO,
- "server had %d jobs on %d connections %d turns"
- " avg queue len %.02f workers %.02f",
- srv->jobs, srv->conn, srv->turn,
- srv->wlen/srv->turn, srv->busy/srv->turn );
-inierr:
- close( srv->lsn );
- return ret;
-} /* svRun */
-
-
-/** helper, probably should go to lio.
- cat l bytes from c to buf b flushing as needed.
- if ! c, empty buf b.
-*/
-int lio_sout ( Ios *s, Buf *b, const char *c, unsigned l )
-{
- if ( ! b )
- b = &s->b;
- do {
- if ( LIO_BUFSIZ/2 < b->fill && b->done ) { /* purge */
- if ( b->done < b->fill )
- memmove( b->c, b->c + b->done, b->fill - b->done );
- s->pos += b->done;
- b->fill -= b->done;
- b->done = 0;
- }
- if ( c && l && b->fill < LIO_BUFSIZ ) {
- unsigned cp = LIO_BUFSIZ - b->fill;
- if ( cp > l )
- cp = l;
- memcpy( b->c + b->fill, c, cp );
- b->fill += cp;
- c += cp;
- l -= cp;
- if ( !l )
- return 0;
- }
- if ( b->fill > b->done ) {
- int wr = lio_write( &s->file, b->c + b->done, b->fill - b->done );
- if ( 0 > wr )
- return wr;
- if ( ! wr )
- return -ERR_BUSY;
- b->done += wr;
- if ( b->done > b->fill ) {
- b->done = b->fill = 0;
- return -ERR_IDIOT;
- }
- }
- if ( b->done >= b->fill ) {
- s->pos += b->done;
- b->done = b->fill = 0;
- }
- } while ( c ? l : (unsigned)(b->fill > b->done) );
- return 0;
-} /* lio_sout */
-
-
-
-#define LF 10 /* LineFeed a.k.a. newline - '\n' isn't really well defined */
-#define TAB 9 /* horizontal, that is */
-#define VT 11 /* vertical, used as newline replacement */
-static const char lf = LF;
-static const char tab = TAB;
-static const char vt = VT;
-
-/* the plain protocol */
-int svPlain ( Ios *s, int op )
-{
- Field *f;
- Con *c = (Con*)s;
- switch ( op ) {
- case LIO_SPUSH: {
- int l = s->b.fill - s->b.done;
- unsigned char *b = s->b.c + s->b.done;
- unsigned char *end = b+l, *v, *p;
- if ( ! l ) { /* EOF: done */
- if ( ! c->prt ) /* ok */
- return 1;
- /* last field wasn't closed by LF */
- return sMsg( ERR_INVAL, "no EOL from %s", c->nam );
- }
- if ( c->prt )
- RSPACE( c->req, l, !0 );
- /* add text lines */
- while ( b<end ) {
- int conti = 0;
- switch ( c->prt ) {
- case 0: /* at beginning of line -- start new field */
- if ( LF == *b ) /* empty line */
- return 1;
- if ( TAB != *b || !c->req->len )
- RADD( c->req, 0,0,end-b, !0 );
- else { /* binary mode continuation line */
- conti = 1;
- if ( ! c->bin ) {
- sMsg( LOG_INFO,
- "detected binary mode on con %s", c->nam );
- c->bin = 1;
- }
- RSPACE( c->req, end-b, !0 );
- }
- if ( ! c->req )
- return -ERR_NOMEM;
- c->prt = 1;
- case 1: /* add to last field */
- f = c->req->field + c->req->len-1;
- v = (unsigned char*)f->val;
- p = v + f->len;
- if ( conti ) {
- *p++ = LF;
- b++;
- }
- if ( c->bin ) {
- for ( ; b<end && LF != (*p = *b++); p++ )
- ;
- } else
- for ( ; b<end && LF != (*p = *b++); p++ )
- if ( VT == *p ) /* convert VTABs */
- *p = LF; /* back to newlines */
- c->req->used += (p - v) - f->len;
- f->len = p - v;
- if ( LF == b[-1] ) {
- int ret = a2il( f->val, f->len, &f->tag );
- if ( ret ) {
- if ( ret < f->len && TAB == v[ret] )
- ret++;
- if ( ret < f->len )
- memmove( v, v+ret, f->len - ret );
- f->len -= ret;
- }
- ret = c->srv->app( c, LSV_APPARG );
- c->prt = 0;
- if ( ret )
- return ret;
- }
- sMsg( LOG_INFO, "prs from %s: [%2d] %3d = '%.*s'",
- c->nam, c->req->len-1, f->tag, f->len, f->val );
- }
- }
- return 0;
- } /* case LIO_SPUSH */
- case LIO_SFLUSH:
- if ( s->b.fill <= s->b.done )
- return 0;
- case LIO_SCLOSE:
- if ( !(LIO_OUT & s->file) )
- return -ERR_INVAL;
- /*
- this switch is not protected, since receiver doesn't care
- */
- if ( c->stg < LSV_STGCOM )
- c->stg = LSV_STGCOM;
- if ( ! s->pos+s->b.done && c->ses->res ) {
- char num[32];
- int n = c->ses->res->len;
- sMsg( LOG_INFO, "com %d fields to %s", c->ses->res->len, c->nam );
- for ( f = c->ses->res->field; n--; f++ ) {
- const char *v = f->val, *nl;
- int vl = f->len;
- int nlen = i2a( num, f->tag );
- num[nlen++] = TAB;
- lio_sout( s, &c->flt, num, nlen );
- while ( vl && (nl = memchr(v,'\n',vl)) ) {
- int ll = 1+(nl-v);
- if ( c->bin ) {
- lio_sout( s, &c->flt, v, ll );
- lio_sout( s, &c->flt, &tab, 1 );
- } else {
- lio_sout( s, &c->flt, v, ll-1 );
- lio_sout( s, &c->flt, &vt, 1 );
- }
- v += ll;
- vl -= ll;
- }
- if ( vl )
- lio_sout( s, &c->flt, v, vl );
- lio_sout( s, &c->flt, &lf, 1 );
- }
- }
- if ( s->b.fill )
- s->b.done = s->b.fill = 0;
- /*
- {
- lio_sout( s, &c->flt, 0, 0 );
- lio_stdio( s, LIO_SFLUSH );
- }
- */
- if ( LIO_SCLOSE == op )
- lio_sout( s, &c->flt, &lf, 1 );
- lio_sout( s, &c->flt, 0, 0 ); /* finally flush */
- return 0;
- }
- return lio_stream( s, op ); /* hmm -- SPURGE ? */
-} /* svPlain */
-
-
-/* the echo application */
-int svEcho ( Con *c, int task )
-{
-#ifdef LSVECHO_DELAY
- static Tm tm = { LLL(50) };
-#endif
- Field *f;
- int i;
- if ( task ) { /* preparing: we don't care */
- if ( LSV_APPGOT == task )
- c->grp = 1; /* don't require mainthead */
- return 0;
- }
- /* run it */
- sMsg( LOG_INFO, "echo %d: %d fields to %s",
- svCur()->id, c->req->len, c->nam );
-#ifdef LSVECHO_DELAY
- timeSleep( &tm );
-#endif
- for ( i=c->req->len, f = c->req->field; i--; f++ ) {
- RADDF( c->ses->res, f, !0 );
- /*
- plain protocol actually does not support plain output ...
- sMsg( c->ses, 0, "%d = '%.*s'\n", f->tag, f->len, f->val );
- */
- }
- return 0;
-} /* svEcho */
-
-#if 0
-int main ( int argc, const char **argv )
-{
- Srv echo;
-
- memset( &echo, 0, sizeof(echo) );
- echo.prt = lsv_plain;
- echo.app = lsv_echo;
- cOpen(0);
- cLog( LOG_DEBUG, 0 );
- return svRun( &echo, 1<argc ? argv[1] : 0 );
-}
-#endif