removed template
[webpac] / openisis / lsv.c
1 /*
2         openisis - an open implementation of the CDS/ISIS database
3         Version 0.8.x (patchlevel see file Version)
4         Copyright (C) 2001-2003 by Erik Grziwotz, erik@openisis.org
5
6         This library is free software; you can redistribute it and/or
7         modify it under the terms of the GNU Lesser General Public
8         License as published by the Free Software Foundation; either
9         version 2.1 of the License, or (at your option) any later version.
10
11         This library is distributed in the hope that it will be useful,
12         but WITHOUT ANY WARRANTY; without even the implied warranty of
13         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14         Lesser General Public License for more details.
15
16         You should have received a copy of the GNU Lesser General Public
17         License along with this library; if not, write to the Free Software
18         Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
20         see README for more information
21 EOH */
22
23 /*
24         $Id: lsv.c,v 1.14 2003/06/11 14:53:26 kripke Exp $
25         OpenIsis server
26 */
27
28 #include <sys/select.h>
29 #include <sys/time.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <unistd.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <string.h> /* memcpy et al */
36 #include <signal.h>
37 #include <errno.h>
38
39
40 #include "lsv.h"
41
42
43
44 static Con *con[FD_SETSIZE]; /* usually 1024, but up to 1<<16 */
45 #ifdef LSVTHR
46 static pthread_mutex_t lsv_mutex_init = PTHREAD_MUTEX_INITIALIZER;
47 static pthread_cond_t  lsv_cond_init  = PTHREAD_COND_INITIALIZER;
48 static pthread_key_t   lsv_key_wrk;
49 #else
50 static Wrk *lsv_wrk_single;
51 #endif
52 static int lsv_term;
53 static siginfo_t lsv_sig;
54
55 static const char * pdg[] = {
56         "---",
57         "NEW",
58         "RED",
59         "EOF"
60 };
61 static const char * stg[] = {
62         "NEW",
63         "CON",
64         "INP",
65         "SES",
66         "WRK",
67         "RUN",
68         "COM",
69         "DON",
70         "RET"
71 };
72
73 static void lsv_sighand ( int sig, siginfo_t *info, void *ucontext )
74 {
75         if ( SIGINT == sig || SIGTERM == sig )
76                 lsv_term = !0;
77         if ( info )
78                 lsv_sig = *info;
79         else { /* ??? */
80                 lsv_sig.si_signo = sig;
81                 lsv_sig.si_pid = 0;
82         }
83         (void)ucontext;
84 }       /* lsv_sighand */
85
86
87 static int lsv_enq ( Que *q, Con *c )
88 {
89         c->nxt = 0;
90         q->len++;
91         if ( q->tail ) {
92                 q->tail->nxt = c;
93                 q->tail = c;
94                 return 1;
95         }
96         q->head = q->tail = c;
97         return 0;
98 }       /* lsv_enq */
99
100
101 static void lsv_penq ( Srv *srv, Con *c )
102 {
103         Pool *work =
104 #ifdef LSVTHR
105                 c->grp ? &srv->pool :
106 #endif
107                 &srv->main;
108         srv->jobs++;
109         c->stg = LSV_STGWRK;
110         if ( c->ses )
111                 c->ses->cur = c;
112         lsv_enq( &work->que, c );
113         sMsg( LOG_DEBUG, "enq req %d from %s qlen %d",
114                 LIO_FD&c->ios.file, c->nam, work->que.len );
115 #ifdef LSVTHR
116         if ( work->wait ) {
117                 Wrk *w = work->wait;
118                 work->wait = w->nxt;
119                 w->nxt = 0;
120                 srv->plen--;
121                 pthread_cond_signal( &w->todo );
122                 sMsg( LOG_DEBUG, "woke %d of %d", w->id, srv->plen );
123         }
124 #endif
125 }       /* lsv_penq */
126
127
128 static Con *lsv_deq ( Que *q )
129 {
130         Con *c = q->head;
131         if ( c ) {
132                 q->len--;
133                 q->head = c->nxt;
134                 c->nxt = 0;
135                 if ( q->tail == c )
136                         q->tail = 0;
137         }
138         return c;
139 }       /* lsv_deq */
140
141
142 Wrk *svCur ()
143 {
144 #ifdef LSVTHR
145         return (Wrk*)pthread_getspecific( lsv_key_wrk );
146 #else
147         return lsv_wrk_single;
148 #endif
149 }
150
151
152 /**
153         in threaded mode, run forever, waiting for jobs.
154         non-threaded, do the jobs that are available and return
155 */
156 static void *lsv_wrk ( void *arg )
157 {
158         Wrk *self = (Wrk*)arg;
159         Srv *srv = self->srv;
160         Con *job = 0;
161         Pool *work =
162 #ifdef LSVTHR
163                 self->id ? &srv->pool :
164 #endif
165                 &srv->main;
166 #ifdef LSVTHR
167         sigset_t blk;
168         
169         /* block signals, they should go to the receiver */
170         sigemptyset( &blk );
171         sigaddset( &blk, SIGHUP );
172         sigaddset( &blk, SIGINT );
173         sigaddset( &blk, SIGTERM );
174         sigaddset( &blk, SIGPIPE );
175         pthread_sigmask( SIG_BLOCK, &blk, 0 );
176
177         pthread_setspecific( lsv_key_wrk, arg );
178         if ( self->id )
179                 srv->app( 0, LSV_APPINI );
180 #else
181         lsv_wrk_single = self;
182         if ( !self->ses )
183 #endif
184         self->ses = sGet(); /* allocate session for thread */
185
186         for (;/*ever*/;) {
187                 int ret;
188
189                 /* ****************************************
190                         critical section enter
191                 */
192 #ifdef LSVTHR
193                 pthread_mutex_lock( &srv->mut );
194 #endif
195                 if ( job ) { /* from last turn */
196                         sMsg( LOG_DEBUG, "thr %d enq job %s pdg %s",
197                                 self->id, job->nam, pdg[job->pdg] );
198                         job->stg = LSV_STGRET;
199                         lsv_enq( &srv->recv, job );
200 #ifdef LSVTHR
201                         if ( job->pdg ) /* wakeup receiver */
202                                 write( srv->pip[1], &job->pdg, 1 );
203 #endif
204                         if ( job->ses->que ) { /* immediatly reschedule queued job */
205                                 Con *q = job->ses->que;
206                                 job->ses->que = q->nxt;
207                                 sMsg( LOG_VERBOSE, "dequeing con %s for ses '%s'",
208                                         q->nam, job->ses->name );
209                                 lsv_penq( srv, q );
210                         }
211                 }
212                 self->cur = 0;
213                 while ( !(job = lsv_deq( &work->que ))
214                         && !(LSV_SHUTDN & srv->flg)
215                 ) {
216 #ifndef LSVTHR
217                         return self;
218 #else
219                         self->nxt = work->wait;
220                         work->wait = self; /* pls pls lemme wrk */
221                         srv->plen++;
222                         self->waits++;
223                         sMsg( LOG_DEBUG, "thr %d waiting", self->id );
224                         /* wait releases the mutex */
225                         pthread_cond_wait( &self->todo, &srv->mut );
226 #endif
227                 }
228                 self->cur = job;
229                 if ( job )
230                         job->stg = LSV_STGRUN;
231 #ifdef LSVTHR
232                 pthread_mutex_unlock( &srv->mut );
233 #endif
234                 /* ****************************************
235                         critical section leave
236                 */
237
238                 /* end of monday morning meeting blah blah -- start working */
239                 if ( ! job || (LSV_SHUTDN & srv->flg) )
240                         break;
241                 self->jobs++;
242                 if ( job->ses )
243                         sSet( job->ses );
244                 else
245                         job->ses = self->ses;
246                 job->ses->io[1] = &job->ios;
247                 if ( job->ses->res )
248                         CLRREC( job->ses->res );
249                 sMsg( LOG_DEBUG, "thr %d run job %s ses '%s'.%d",
250                         self->id, job->nam, job->ses->name, job->ses->accnt );
251                 ret = srv->app( job, LSV_APPRUN );
252                 sMsg( LOG_DEBUG, "thr %d got %d buf %d",
253                         self->id, ret, LIO_SAVAIL( &job->ios ) );
254
255                 /*
256                 lock only in order to create SMP memory barrier.
257                 on a single processor system, the byte should be safely readable
258                 by receiver without this lock.
259                 TOD: check alternatives:
260                 since receiver is checking the stage of active connections
261                 only in rare cases, it might be more efficient to guard either
262                 this set or the processing up to here with a per thread mutex.
263                 */
264 #ifdef LSVTHR
265                 pthread_mutex_lock( &srv->mut );
266 #endif
267                 job->stg = LSV_STGDON;
268                 job->ses->cur = 0;
269 #ifdef LSVTHR
270                 pthread_mutex_unlock( &srv->mut );
271 #endif
272
273                 ret = srv->prt( &job->ios, LIO_SCLOSE );
274                 job->ses->io[1] = 0;
275                 sMsg( LOG_DEBUG, "thr %d done job %s", self->id, job->nam );
276                 if ( job->ses != self->ses )
277                         sSet( self->ses ); /* reset session */
278         }
279 #ifndef LSVTHR
280         if ( self->id )
281                 srv->app( 0, LSV_APPFIN );
282 #endif
283         return self;
284 }       /* lsv_wrk */
285
286
287 static void *lsv_rcv ( void *arg )
288 {
289         Srv *srv = (Srv*)arg;
290         fd_set fd;
291         int fdlen = 0;
292         int pending = 0;
293         int ret = 0, sel, i;
294 #ifdef LSVTHR
295         int inlock = 0;
296 #endif
297
298         FD_ZERO( &fd );
299         FD_SET( srv->lsn, &fd );
300         fdlen = srv->lsn+1;
301 #ifdef LSVTHR
302         FD_SET( srv->pip[0], &fd );
303         if ( fdlen < srv->pip[0]+1 )
304                 fdlen = srv->pip[0]+1;
305 #endif
306
307         for (;/*ever*/;) {
308                 struct sockaddr_in npeer; /* of new connection */
309                 int nsock = -1; /* new socket */
310                 Con *c;
311                 fd_set ready = fd;
312                 Tm expire;
313
314                 /* select on read only; writing is done in workers and
315                         exceptions (i.e. telnet OOB data) are not used
316                 */
317 #ifndef LSVTHR
318                 if ( srv->recv.len ) { /* instead of listening on pip */
319                         memset( &ready, 0, sizeof(ready) );
320                         sel = 0;
321                 } else
322 #endif
323                 if ( 0 > (sel = select( fdlen, &ready, 0, 0, 0 )) ) {
324                         if ( EINTR == errno ) {
325                                 sMsg( LOG_DEBUG, "select woken by signal" );
326                         } else {
327                                 ret = sMsg( LOG_SYSERR, "select" );
328                                 goto done;
329                         }
330                 }
331                 if ( lsv_sig.si_signo ) {
332                         sMsg( LOG_DEBUG, "SIG%s (%d) from pid %d",
333                                 SIGHUP==lsv_sig.si_signo ? "HUP" :
334                                 SIGINT==lsv_sig.si_signo ? "INT" :
335                                 SIGTERM==lsv_sig.si_signo ? "TERM" :
336                                 SIGPIPE==lsv_sig.si_signo ? "PIPE" : "?",
337                                 lsv_sig.si_signo, lsv_sig.si_pid );
338                         if ( 1 /*SIGHUP==lsv_sig.si_signo*/ ) {
339                                 for ( i=FD_SETSIZE; i--; )
340                                         if ( con[i] ) {
341                                                 c = con[i];
342                                                 sMsg( LOG_INFO,
343                                                         "con %3d peer %s stg %s prt %d app %d ios %d %s (0x%x)",
344                                                         i, c->nam, stg[c->stg], c->prt, c->app,
345                                                         c->ios.pos + c->ios.b.done,
346                                                         LIO_SISOPEN(&c->ios) ? "opn" : "clo", c->ios.file );
347                                         }
348                         }
349                         if ( lsv_term ) /* even if it wasn't the last signal delivered */
350                                 goto done;
351                         lsv_sig.si_signo = 0;
352                         /* must NOT access the set after interrupt,
353                                 probably all fds are set
354                         */
355                         continue;
356                 }
357                 timeUpd( &srv->tim );
358                 timeGtfm( srv->gtm, &srv->tim );
359                 expire.millis = srv->tim.millis - srv->sto*LLL(1000);
360
361 #ifdef LSVTHR
362                 pthread_mutex_lock( &srv->mut ); /* back in locked land */
363                 inlock = !0;
364 #endif
365
366                 /* -- we're the acceptor */
367                 if ( FD_ISSET( srv->lsn, &ready ) ) { /* new connection */
368                         unsigned /*socklen_t is broken*/ alen = sizeof(npeer);
369                         nsock = accept( srv->lsn, (struct sockaddr*)&npeer, &alen );
370                         if ( 0 > nsock ) {
371                                 ret = sMsg( LOG_SYSERR, "accept" );
372                                 goto done;
373                         }
374                         if ( FD_SETSIZE <= nsock ) {
375                                 ret = sMsg( ERR_BADF,
376                                         "socket %d >= FD_SETSIZE %d", nsock, FD_SETSIZE );
377                                 goto done;
378                         }
379                         if ( sizeof(npeer) != alen ) {
380                                 ret = sMsg( ERR_INVAL, "bad peer len %d", alen );
381                                 goto done;
382                         }
383                         /* ok */
384                         /* setsockopt
385                                 SO_SNDTIMEO fixed value on linux
386                                 SO_OOBINLINE ... hmm, don't care
387                                 SO_LINGER off
388                         */
389                         /* prepare con */
390                         if ( !(c = con[nsock]) ) {
391                                 c = con[nsock] = mAlloc( sizeof(*c) ); 
392                                 c->ios.file = nsock; /* preliminary -- further setup below */
393                         }
394                         c->pdg = LSV_PDGNEW;
395                         switch ( c->stg ) {
396                         case LSV_STGCON: /* still in control of receiver */
397                         case LSV_STGINP:
398                                 sMsg( ERR_IDIOT, "bad recv stage %d on new con", c->stg );
399                                 c->stg = LSV_STGNEW;
400                         case LSV_STGNEW: /* clean idle socket -- pass to receiver */
401                                 lsv_enq( &srv->recv, c );
402                                 break;
403                         case LSV_STGSES: /* still in control of receiver */
404                         case LSV_STGWRK:
405                                 sMsg( ERR_IDIOT, "bad worker stage %d on new con", c->stg );
406                         case LSV_STGRUN: /* worker will queue it */
407                                 /* hmmm ... should have COM ... */
408                         case LSV_STGCOM:
409                         case LSV_STGDON:
410                         case LSV_STGRET: /* worker has queued it */
411                                 pending++;
412                                 break;
413                         }
414                         srv->conn++;
415                         FD_CLR( srv->lsn, &ready ); /* skip in fd check */
416                 } /* new connection */
417
418 #ifdef LSVTHR
419                 if ( FD_ISSET( srv->pip[0], &ready ) ) { /* pending events */
420                         char trash[LSV_NUMWRK];
421                         read( srv->pip[0], trash, sizeof(trash) );
422                         FD_CLR( srv->pip[0], &ready );
423                         sel--;
424                 }
425 #endif
426
427                 /* switching roles -- now we're the receiver */
428                 while ( (c = lsv_deq( &srv->recv )) ) { /* care for ready connections */
429                         int sock = LIO_FD & c->ios.file;
430                         sMsg( LOG_DEBUG, "deq %d from %s pdg %s",
431                                 sock, c->nam, pdg[c->pdg] );
432                         if ( c->pdg && c->stg )
433                                 pending--;
434                         if ( !(LIO_IN & c->ios.file) && (LIO_RDWR & c->ios.file) ) {
435                                 /* cleanup closed socks */
436                                 if ( c != con[sock] ) {
437                                         ret = sMsg( ERR_IDIOT,
438                                                 "deq bad sock %d (0x%x)", sock, c->ios.file );
439                                         goto done; /* PANIC time */
440                                 }
441                                 sMsg( LOG_DEBUG, "clos%s sock %d",
442                                         (LIO_OUT & c->ios.file) ? "ing" : "ed", sock );
443                                 if ( (LIO_OUT & c->ios.file) )
444                                         lio_close( &c->ios.file, LIO_INOUT );
445                                 c->ios.file &= ~LIO_RDWR; /* clear also, so we're not closing again */
446                                 FD_CLR( sock, &fd );
447                                 FD_CLR( sock, &ready ); /* probably an EOF to read */
448                                 if ( fdlen == sock+1 )
449                                         while ( fdlen
450                                                 /* && !FD_ISSET( fdlen-1, &fd ) risk of muting tmp. disabled */
451                                                 && (!con[fdlen-1] || !LIO_SISOPEN(&con[fdlen-1]->ios))
452                                         )
453                                                 fdlen--;
454                         }
455                         if ( LSV_PDGNEW == c->pdg ) { /* new connection */
456                                 struct sockaddr_in peer; /* of new connection */
457                                 if ( nsock == sock ) /* the one accepted just above */
458                                         peer = npeer;
459                                 else {
460                                         unsigned alen = sizeof(peer);
461                                         if ( getpeername( sock, (struct sockaddr*)&peer, &alen )
462                                                 || sizeof(peer) != alen
463                                         ) {
464                                                 ret = sMsg( LOG_SYSERR, "getpeername alen %d", alen );
465                                                 goto done;
466                                         }
467                                 }
468                                 c->srv = srv;
469                                 c->ses = 0;
470                                 {
471                                         unsigned host = (unsigned)ntohl( peer.sin_addr.s_addr );
472                                         unsigned short port = (unsigned short)ntohs( peer.sin_port );
473                                         char *p = c->nam;
474                                         c->host = host;
475                                         memset( c->nam, 0, sizeof(c->nam) );
476                                         p += u2a( p, 0xff & (host >> 24) ); *p++ = '.';
477                                         p += u2a( p, 0xff & (host >> 16) ); *p++ = '.';
478                                         p += u2a( p, 0xff & (host >>  8) ); *p++ = '.';
479                                         p += u2a( p, 0xff & (host      ) ); *p++ = ':';
480                                         p += u2a( p, port );
481                                         /* *p++ = '@'; p += u2a( p, sock ); */
482                                 }
483                                 c->con++; /* never mind overflow ... */
484                                 sMsg( LOG_INFO,
485                                         "connect %d on %d from %s", c->con+1, sock, c->nam );
486                                 LIO_SINIT( &c->ios, srv->prt, c->nam,
487                                         sock | LIO_INOUT|LIO_RDWR|LIO_SOCK );
488                                 LIO_BINIT( &c->flt );
489                                 FD_SET( sock, &fd );
490                                 if ( fdlen < sock+1 )
491                                         fdlen = sock+1;
492                                 c->bin = 0; /* do not reset on every request */
493                         }
494                         if ( ! (LIO_IN & c->ios.file) )
495                                 c->stg = LSV_STGNEW;
496                         else {
497                                 c->stg = LSV_STGCON;
498                                 if ( ! FD_ISSET( sock, &fd ) ) {
499                                         sMsg( LOG_DEBUG, "reenabling %s", c->nam );
500                                         FD_SET( sock, &fd );
501                                         if ( fdlen < sock+1 )
502                                                 fdlen = sock+1;
503                                 }
504                         } 
505                         c->grp = c->pdg = c->prt = c->app = 0;
506                         /* prepare for new request */
507                         c->ios.pos = 0;
508                         c->ios.b.done = c->ios.b.fill = 0;
509                         if ( c->req )
510                                 CLRREC( c->req );
511                         else {
512                                 /* this is done in the receiver rather than the acceptor
513                                         since in multi-receiver model, receiver owns the req
514                                 */
515                                 c->req = mAlloc( LSV_BUFSIZ );
516                                 OPENISIS_INITREC( c->req, LSV_BUFSIZ, 64 );
517                         }
518                         if ( !(LSV_CONSES & srv->flg) )
519                                 c->ses = 0;
520                 }       /* prepare connections */
521
522
523                 for ( i=fdlen; sel && i--; ) { /* read from the ready */
524                         Field *sid;
525                         int got;
526                         if ( ! FD_ISSET( i, &ready ) )
527                                 continue;
528                         sel--;
529                         c = con[i];
530                         if ( ! c ) {
531                                 ret = sMsg( ERR_IDIOT, "ran on empty con %d", i );
532                                 goto done;
533                         }
534                         if ( LSV_STGINP < c->stg ) {
535                                 sMsg( LOG_DEBUG, "con %d busy stage %d!", i, c->stg );
536                                 c->pdg = LSV_PDGRED;
537                                 pending++;
538                                 FD_CLR( i, &fd ); /* mute this */
539                                 continue;
540                         }
541                         c->ios.b.done = 0; 
542                         got = lio_read( &c->ios.file, c->ios.b.c, sizeof(c->ios.b.c) );
543                         if ( 0 >= got ) {
544                                 if ( -ERR_EOF != got )
545                                         goto err;
546                                 sMsg( LOG_DEBUG, "EOF on 0x%x line %d from %s",
547                                         c->ios.file, c->req->len, c->nam );
548                                 FD_CLR( i, &fd );
549                                 if ( ! c->req->len ) /* initial eof */
550                                         goto eof; /* close immediatly */
551                                 /* lio_close( &c->ios.file, LIO_IN ); */
552                                 c->ios.file &= ~LIO_IN; /* don't shut */
553                                 c->pdg = LSV_PDGEOF; /* so it will be dequed and closed */
554                                 pending++;
555                                 got = 0; /* tell proto to close ... */
556                         }
557                         if ( ! c->req->len ) { /* stage should be CON */
558                                 RADDS( c->req, -LSV_PEER, c->nam, 1 );
559                                 sMsg( LOG_DEBUG, "req on %d from %s",
560                                         LIO_FD&c->ios.file, c->nam );
561                         }
562                         c->stg = LSV_STGINP;
563                         c->ios.b.fill = got;
564                         got = srv->prt( &c->ios, LIO_SPUSH );
565                         sMsg( LOG_DEBUG, "got 0x%x from %s", got, c->nam );
566                         if ( ! got && (LIO_IN & c->ios.file) ) /* ok: more next time */
567                                 continue;
568                         if ( 0 > got ) /* nok */
569                                 goto err;
570                         got = c->srv->app( c, LSV_APPGOT );
571                         if ( 0 > got ) /* nok */
572                                 goto err;
573                         if ( !(LSV_CONSES & srv->flg)
574                                 && c->req && (sid = rGet( c->req, -LSV_SID, 0 ))
575                         ) {
576                                 c->ses = cSesByName( (char*)sid->val, sid->len, &srv->tim, &expire );
577                                 if ( ! c->ses ) { /* sad sad sad */
578                                         sMsg( LOG_ERROR, "out of sessions" );
579                                         goto err; /* drop connection */
580                                 }
581                                 if ( ! c->ses->accnt
582                                         && (! c->ses->prop || ! c->ses->prop->len) /* overflow paranoia */
583                                 )
584                                         got = c->srv->app( c, LSV_APPSES );
585                                 if ( 0 > got ) /* nok */
586                                         goto err;
587                         }
588                         c->tim = srv->tim;
589                         RADDS( c->req, -LSV_TIME, srv->gtm, 1 );
590                         if ( c->ses ) {
591                                 sMsg( LOG_DEBUG, "req %s for ses '%s' %s",
592                                         c->nam, c->ses->name, c->ses->cur ? "busy" : "idle" );
593                                 c->ses->accnt++;
594                                 if ( c->ses->cur || c->ses->que ) { /* oh - busy !? */
595                                         Con **cpp = &c->ses->que;
596                                         int l = 0;
597                                         for ( ; *cpp; cpp = &(*cpp)->nxt )
598                                                 if ( 4 < ++l ) {
599                                                         sMsg( LOG_WARN,
600                                                                 "too many con queing on ses '%s' - dropping %s",
601                                                                 c->ses->name, c->nam );
602                                                         goto err;
603                                                 }
604                                         sMsg( LOG_VERBOSE, "queing con %s for ses '%s'",
605                                                 c->nam, c->ses->name );
606                                         c->nxt = 0;
607                                         *cpp = c;
608                                         c->stg = LSV_STGSES;
609                                         continue;
610                                 }
611                         }
612                         /* yo - got it */
613                         lsv_penq( srv, c );
614                         continue;
615                 err:
616                         sMsg( ERR_BADF, "err 0x%x from %s", got, c->nam );
617                 eof:
618                         lio_close( &c->ios.file, LIO_INOUT );
619                         c->stg = LSV_STGNEW;
620                         FD_CLR( i, &fd );
621                 } /* read from the ready */
622
623                 srv->turn++;
624                 srv->busy += srv->nwr - srv->plen;
625 #ifdef LSVTHR
626                 srv->wlen += srv->pool.que.len;
627
628                 sMsg( LOG_INFO,
629                         "\n==== %s\nturn %d: %d jobs for %d idle workers %d pending",
630                         srv->gtm, srv->turn, srv->pool.que.len, srv->plen, pending );
631                 for ( i=srv->nwr; i--; )
632                         if ( srv->wrk[i].cur )
633                                 sMsg( LOG_INFO, "thr %d serving %d from %s",
634                                         i, LIO_FD&srv->wrk[i].cur->ios.file, srv->wrk[i].cur->nam );
635
636                 inlock = 0;
637                 pthread_mutex_unlock( &srv->mut );
638 #else
639                 lsv_wrk( srv->wrk );
640 #endif
641         } /* for(ever) */
642 done:
643 #ifdef LSVTHR
644         if ( inlock )
645                 pthread_mutex_unlock( &srv->mut );
646         srv->flg |= LSV_SHUTDN;
647         for ( i=srv->nwr; i--; )
648                 pthread_cond_signal( &srv->wrk[i].todo );
649 #endif
650         return srv;
651 }       /* lsv_rcv */
652
653
654 int svRun ( Srv *srv, const char *addr )
655 {
656         static const int yes = !0;
657         struct sigaction siga;
658         struct sockaddr_in sa; /* server address */
659         int ret = 0, i;
660
661         /* assume we're the main-thread
662                 -- try to make sure we get the controlling session
663         */
664         sGet();
665
666         /* set defaults */
667         if ( ! srv->sto )
668                 srv->sto = 30*60; /* 30 minutes session timeout */
669 #ifndef LSVTHR
670         srv->nwr = 1;
671 #else
672         if ( ! srv->nwr )
673                 srv->nwr = LSV_NUMWRK/2; /* half the max */
674         else if ( LSV_NUMWRK < srv->nwr )
675                 srv->nwr = LSV_NUMWRK; /* max */
676
677         srv->mut = lsv_mutex_init;
678 #endif
679         /* prepare signals */
680         memset( &siga, 0, sizeof(siga) );
681         siga.sa_sigaction = lsv_sighand;
682         sigemptyset( &siga.sa_mask );
683         sigaddset( &siga.sa_mask, SIGHUP );
684         sigaddset( &siga.sa_mask, SIGINT );
685         sigaddset( &siga.sa_mask, SIGTERM );
686         sigaddset( &siga.sa_mask, SIGPIPE );
687         siga.sa_flags = SA_SIGINFO;
688         sigaction( SIGHUP, &siga, 0 );
689         sigaction( SIGINT, &siga, 0 );
690         sigaction( SIGTERM, &siga, 0 );
691         sigaction( SIGPIPE, &siga, 0 );
692         
693         /* prepare socket */
694         srv->lsn = socket( PF_INET, SOCK_STREAM, IPPROTO_TCP/*6*/ );
695         /* fcntl( srv->lsn, O_NONBLOCK ); not needed */
696         sa.sin_family = AF_INET;
697         sa.sin_port = htons( addr ? a2i( addr, -1 ) : 8080 );
698         sa.sin_addr.s_addr =
699 #ifdef LSV_LOCALHOST_ONLY
700                 htonl(INADDR_LOOPBACK);
701 #else
702                 INADDR_ANY;
703 #endif
704         if ( setsockopt( srv->lsn, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes) ) ) {
705                 ret = sMsg( LOG_SYSERR, "setsockopt" );
706                 goto inierr;
707         }
708         if ( bind( srv->lsn, (struct sockaddr*)&sa, sizeof(sa) ) ) {
709                 ret = sMsg( LOG_SYSERR, "bind on port %u", htons(sa.sin_port) );
710                 goto inierr;
711         }
712         if ( listen( srv->lsn, 64/*backlog*/ ) ) {
713                 ret = sMsg( LOG_SYSERR, "listen" );
714                 goto inierr;
715         }
716 #ifdef LSVTHR
717         /* prepare pending event pipe */
718         if ( pipe( srv->pip ) ) {
719                 ret = sMsg( LOG_SYSERR, "pipe" );
720                 goto inierr;
721         }
722
723         if ( pthread_key_create( &lsv_key_wrk, 0 ) ) {
724                 ret = sMsg( LOG_SYSERR, "key_create" );
725                 goto inierr;
726         }
727 #endif
728         /* setup workers */
729         memset( srv->wrk, 0, sizeof(srv->wrk) );
730         for ( i=srv->nwr; i--; ) {
731                 srv->wrk[i].id  = i;
732                 srv->wrk[i].srv = srv;
733                 srv->wrk[i].nxt = 0;
734                 srv->wrk[i].ses = cSession(0);
735                 memcpy( srv->wrk[i].ses->name, "wrk", 3 );
736                 u2a( srv->wrk[i].ses->name+3, i );
737 #ifdef LSVTHR
738                 srv->wrk[i].todo = lsv_cond_init;
739                 if ( i )
740                         pthread_create( &srv->wrk[i].thr, 0, lsv_wrk, srv->wrk+i );
741 #endif
742         }
743 #ifndef LSVTHR
744         lsv_rcv( srv );
745 #else
746         /* start the server thread */
747         pthread_create( &srv->rcv, 0, lsv_rcv, srv );
748         /* run main worker */
749         srv->wrk[0].thr = pthread_self();
750         lsv_wrk( srv->wrk );
751
752         /* cleanup */
753         pthread_join( srv->rcv, 0 );
754         for ( i=srv->nwr; i--; )
755                 pthread_join( srv->wrk[i].thr, 0 );
756 #endif
757         for ( i=srv->nwr; i--; )
758                 sMsg( LOG_INFO, "worker %d had %d jobs %d waits",
759                         i, srv->wrk[i].jobs, srv->wrk[i].waits );
760         sMsg( LOG_INFO,
761                 "server had %d jobs on %d connections %d turns"
762                 " avg queue len %.02f workers %.02f",
763                 srv->jobs, srv->conn, srv->turn,
764                 srv->wlen/srv->turn, srv->busy/srv->turn );
765 inierr:
766         close( srv->lsn );
767         return ret;
768 }       /* svRun */
769
770
771 /** helper, probably should go to lio.
772         cat l bytes from c to buf b flushing as needed.
773         if ! c, empty buf b.
774 */
775 int lio_sout ( Ios *s, Buf *b, const char *c, unsigned l )
776 {
777         if ( ! b )
778                 b = &s->b;
779         do {
780                 if ( LIO_BUFSIZ/2 < b->fill && b->done ) { /* purge */
781                         if ( b->done < b->fill )
782                                 memmove( b->c, b->c + b->done, b->fill - b->done );
783                         s->pos += b->done;
784                         b->fill -= b->done;
785                         b->done = 0;
786                 }
787                 if ( c && l && b->fill < LIO_BUFSIZ ) {
788                         unsigned cp = LIO_BUFSIZ - b->fill;
789                         if ( cp > l )
790                                 cp = l;
791                         memcpy( b->c + b->fill, c, cp );
792                         b->fill += cp;
793                         c += cp;
794                         l -= cp;
795                         if ( !l )
796                                 return 0;
797                 }
798                 if ( b->fill > b->done ) {
799                         int wr = lio_write( &s->file, b->c + b->done, b->fill - b->done );
800                         if ( 0 > wr )
801                                 return wr;
802                         if ( ! wr )
803                                 return -ERR_BUSY;
804                         b->done += wr;
805                         if ( b->done > b->fill ) {
806                                 b->done = b->fill = 0;
807                                 return -ERR_IDIOT;
808                         }
809                 }
810                 if ( b->done >= b->fill ) {
811                         s->pos += b->done;
812                         b->done = b->fill = 0;
813                 }
814         } while ( c ? l : (unsigned)(b->fill > b->done) );
815         return 0;
816 }       /* lio_sout */
817
818
819
820 #define LF 10 /* LineFeed a.k.a. newline - '\n' isn't really well defined */
821 #define TAB 9 /* horizontal, that is */
822 #define VT 11 /* vertical, used as newline replacement */
823 static const char lf = LF;
824 static const char tab = TAB;
825 static const char vt = VT;
826
827 /* the plain protocol */
828 int svPlain ( Ios *s, int op )
829 {
830         Field *f;
831         Con *c = (Con*)s;
832         switch ( op ) {
833         case LIO_SPUSH: {
834                 int l = s->b.fill - s->b.done;
835                 unsigned char *b = s->b.c + s->b.done;
836                 unsigned char *end = b+l, *v, *p;
837                 if ( ! l ) { /* EOF: done */
838                         if ( ! c->prt ) /* ok */
839                                 return 1;
840                         /* last field wasn't closed by LF */
841                         return sMsg( ERR_INVAL, "no EOL from %s", c->nam );
842                 }
843                 if ( c->prt )
844                         RSPACE( c->req, l, !0 );
845                 /* add text lines */
846                 while ( b<end ) {
847                         int conti = 0;
848                         switch ( c->prt ) {
849                         case 0: /* at beginning of line -- start new field */
850                                 if ( LF == *b ) /* empty line */
851                                         return 1;
852                                 if ( TAB != *b || !c->req->len )
853                                         RADD( c->req, 0,0,end-b, !0 );
854                                 else { /* binary mode continuation line */
855                                         conti = 1;
856                                         if ( ! c->bin ) {
857                                                 sMsg( LOG_INFO,
858                                                         "detected binary mode on con %s", c->nam );
859                                                 c->bin = 1;
860                                         }
861                                         RSPACE( c->req, end-b, !0 );
862                                 }
863                                 if ( ! c->req )
864                                         return -ERR_NOMEM;
865                                 c->prt = 1;
866                         case 1: /* add to last field */
867                                 f = c->req->field + c->req->len-1;
868                                 v = (unsigned char*)f->val;
869                                 p = v + f->len;
870                                 if ( conti ) {
871                                         *p++ = LF;
872                                         b++;
873                                 }
874                                 if ( c->bin ) {
875                                         for ( ; b<end && LF != (*p = *b++); p++ )
876                                                 ;
877                                 } else 
878                                         for ( ; b<end && LF != (*p = *b++); p++ )
879                                                 if ( VT == *p ) /* convert VTABs */
880                                                         *p = LF; /* back to newlines */
881                                 c->req->used += (p - v) - f->len;
882                                 f->len = p - v;
883                                 if ( LF == b[-1] ) {
884                                         int ret = a2il( f->val, f->len, &f->tag );
885                                         if ( ret ) {
886                                                 if ( ret < f->len && TAB == v[ret] )
887                                                         ret++;
888                                                 if ( ret < f->len )
889                                                         memmove( v, v+ret, f->len - ret );
890                                                 f->len -= ret;
891                                         }
892                                         ret = c->srv->app( c, LSV_APPARG );
893                                         c->prt = 0;
894                                         if ( ret )
895                                                 return ret;
896                                 }
897                                 sMsg( LOG_INFO, "prs from %s: [%2d] %3d = '%.*s'",
898                                         c->nam, c->req->len-1, f->tag, f->len, f->val  );
899                         }
900                 }
901                 return 0;
902         }       /* case LIO_SPUSH */
903         case LIO_SFLUSH:
904                 if ( s->b.fill <= s->b.done )
905                         return 0;
906         case LIO_SCLOSE:
907                 if ( !(LIO_OUT & s->file) )
908                         return -ERR_INVAL;
909                 /*
910                         this switch is not protected, since receiver doesn't care
911                 */
912                 if ( c->stg < LSV_STGCOM )
913                         c->stg = LSV_STGCOM;
914                 if ( ! s->pos+s->b.done && c->ses->res ) {
915                         char num[32];
916                         int n = c->ses->res->len;
917                         sMsg( LOG_INFO, "com %d fields to %s", c->ses->res->len, c->nam );
918                         for ( f = c->ses->res->field; n--; f++ ) {
919                                 const char *v = f->val, *nl;
920                                 int vl = f->len;
921                                 int nlen = i2a( num, f->tag );
922                                 num[nlen++] = TAB;
923                                 lio_sout( s, &c->flt, num, nlen );
924                                 while ( vl && (nl = memchr(v,'\n',vl)) ) {
925                                         int ll = 1+(nl-v);
926                                         if ( c->bin ) {
927                                                 lio_sout( s, &c->flt, v, ll );
928                                                 lio_sout( s, &c->flt, &tab, 1 );
929                                         } else {
930                                                 lio_sout( s, &c->flt, v, ll-1 );
931                                                 lio_sout( s, &c->flt, &vt, 1 );
932                                         }
933                                         v += ll;
934                                         vl -= ll;
935                                 }
936                                 if ( vl )
937                                         lio_sout( s, &c->flt, v, vl );
938                                 lio_sout( s, &c->flt, &lf, 1 );
939                         }
940                 }
941                 if ( s->b.fill )
942                         s->b.done = s->b.fill = 0;
943                 /*
944                 {
945                         lio_sout( s, &c->flt, 0, 0 ); 
946                         lio_stdio( s, LIO_SFLUSH );
947                 }
948                 */
949                 if ( LIO_SCLOSE == op )
950                         lio_sout( s, &c->flt, &lf, 1 );
951                 lio_sout( s, &c->flt, 0, 0 ); /* finally flush */
952                 return 0;
953         }
954         return lio_stream( s, op ); /* hmm -- SPURGE ? */
955 }       /* svPlain */
956
957
958 /* the echo application */
959 int svEcho ( Con *c, int task )
960 {
961 #ifdef LSVECHO_DELAY
962         static Tm tm = { LLL(50) };
963 #endif
964         Field *f;
965         int i;
966         if ( task ) { /* preparing: we don't care */
967                 if ( LSV_APPGOT == task )
968                         c->grp = 1; /* don't require mainthead */
969                 return 0;
970         }
971         /* run it */
972         sMsg( LOG_INFO, "echo %d: %d fields to %s",
973                 svCur()->id, c->req->len, c->nam );
974 #ifdef LSVECHO_DELAY
975         timeSleep( &tm );
976 #endif
977         for ( i=c->req->len, f = c->req->field; i--; f++ ) {
978                 RADDF( c->ses->res, f, !0 );
979                 /*
980                 plain protocol actually does not support plain output ...
981                 sMsg( c->ses, 0, "%d = '%.*s'\n", f->tag, f->len, f->val );
982                 */
983         }
984         return 0;
985 }       /* svEcho */
986
987 #if 0
988 int main ( int argc, const char **argv )
989 {
990         Srv echo;
991
992         memset( &echo, 0, sizeof(echo) );
993         echo.prt = lsv_plain;
994         echo.app = lsv_echo;
995         cOpen(0);
996         cLog( LOG_DEBUG, 0 );
997         return svRun( &echo, 1<argc ? argv[1] : 0 );
998 }
999 #endif