fixed parsing for science direct html with more than one <a href=> per one <tr>
[webpac] / openisis / lbt.c
1 /*
2         openisis - an open implementation of the CDS/ISIS database
3         Version 0.8.x (patchlevel see file Version)
4         Copyright (C) 2001-2003 by Erik Grziwotz, erik@openisis.org
5
6         This library is free software; you can redistribute it and/or
7         modify it under the terms of the GNU Lesser General Public
8         License as published by the Free Software Foundation; either
9         version 2.1 of the License, or (at your option) any later version.
10
11         This library is distributed in the hope that it will be useful,
12         but WITHOUT ANY WARRANTY; without even the implied warranty of
13         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14         Lesser General Public License for more details.
15
16         You should have received a copy of the GNU Lesser General Public
17         License along with this library; if not, write to the Free Software
18         Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
20         see README for more information
21 EOH */
22
23 /*
24         $Id: lbt.c,v 1.35 2003/05/27 11:03:30 kripke Exp $
25         the btree.
26 */
27
28 #include <stdlib.h>
29 #include <string.h>
30
31 /* special
32 #if defined( __GNUC__ ) && defined ( alloca )
33 #include <alloca.h>
34 #endif
35 */
36 #if defined( sparc ) || defined( __ppc__ )
37 #       define LDB_BIG_ENDIAN
38 #endif
39
40 #include "lio.h"
41 #include "luti.h"
42 #include "lbt.h"
43
44
45 /*
46 probably all relevant cases can be handled by the more
47 efficient approach of storing recoded key a la strxfrm
48 */
49 #define LBT_USECMP 0
50
51
52 /* ************************************************************
53         private types
54 */
55
56 enum {
57         /* little endian magic text */
58         LBT_OPEN = 'O' | 'P'<<8 | 'E'<<16 | 'N'<<24,
59         LBT_ISIS = 'I' | 'S'<<8 | 'I'<<16 | 'S'<<24
60 };
61
62
63 enum {
64         LBT_LEVELS  = 16, /* max depth */
65         LBT_POSBITS = 13, /* bits for position within block */
66         LBT_POSMASK = 0x1fff,
67         LBT_POSMAX  = LBT_POSMASK /* obviously :) */
68 };
69
70 typedef struct Dict { /* dictionary of block entries */
71         unsigned short pos; /* 13 bits position of entry, 2 bits val for leaves */
72         unsigned char  vln; /* length of entry's value */
73                 /* for inner nodes, this is the length of qualifying
74                         second key segment (either 0 or bt->vsz <= 23).
75                         for leaves, this, together with higher bits of pos,
76                         is the number of values of bt->vsz each.
77                 */
78         unsigned char  kln; /* length of entry's key */
79 /*
80         the following access macros are needed for leaves only
81 */
82 #define LBT_POS( d ) (LBT_POSMASK & (d)->pos)
83 #define LBT_VLN( d ) ((d)->vln | (~LBT_POSMASK & (d)->pos)>>(LBT_POSBITS-8))
84 #define LBT_POSADD( d, a ) do { \
85                 (d)->pos = (~LBT_POSMASK & (d)->pos) | (LBT_POSMASK & ((d)->pos + (a))); \
86         } while (0)
87 #define LBT_VLNADD( d, a ) do { \
88                 unsigned _vln = LBT_VLN( d ) + (a); \
89                 (d)->pos = (LBT_POSMASK & (d)->pos) \
90                         | (~LBT_POSMASK & ((_vln)<<(LBT_POSBITS-8))); \
91                 (d)->vln = (unsigned char)(_vln); \
92         } while (0)
93 #define LBT_PV( d, p, v ) do { \
94                 (d)->pos = (LBT_POSMASK & (p)) | (~LBT_POSMASK & ((v)<<(LBT_POSBITS-8))); \
95                 (d)->vln = (unsigned char)(v); \
96         } while (0)
97 #define LBT_PVK( d, p, v, k ) do { \
98                 LBT_PV( d, p, v ); \
99                 (d)->kln = (unsigned char)(k); \
100         } while (0)
101 } Dict;
102
103
104 /* guard space around block */
105 #ifndef NDEBUG
106 #       define LBT_GUARD 64
107 /* template for guard space -- blank or pattern */
108 static unsigned char guard[LBT_GUARD];
109 #else
110 #       define LBT_GUARD 0
111 #endif
112
113
114 /*
115         TODO: check the possible performance gain in findkey
116         from separating disk block and administrative data
117         and aligning disk blocks on PAGE boundaries in memory
118 */
119 typedef struct Block {
120 #if LBT_GUARD
121         unsigned char  grd[LBT_GUARD];
122 #endif
123         unsigned char  ird; /* is reading */
124         unsigned char  wrd; /* #threads waiting for read */
125         unsigned char  lck; /* is write locked */
126         unsigned char  wlk; /* #threads waiting for write lock */
127 #define LBT_BUSY( b ) (*(int*)&(b)->ird) /* any of 1st 4 bytes != 0 */
128         struct Block  *nhs; /* next in hash chain */
129         struct Block  *nru; /* next (more) recently used */
130         /* TODO: possibly double-link lru list, so we can unlink on every access ? */
131         /* the disk block -- from here on bsz bytes are on disk */
132 #define LBT_BOFF ((char *)&((Block*)0)->num - (char*)0)
133 #define LBT_BASE( b ) ((unsigned char *)&(b)->num)
134         unsigned       num; /* number of this block, 0 is root */
135         unsigned       nxt; /* right sibling, "OPEN" or "ISIS" for root */
136         /* typ,key,col are redundant in non-root blocks, maybe abused ... */
137         unsigned char  typ; /* type: bsz, vsz, flags */
138         unsigned char  key; /* max key length */
139         unsigned char  col; /* collation */
140         unsigned char  lev; /* level over bottom, 0 for leaves */
141         unsigned short ent; /* 10 bits # entries */
142 #define LBT_ENT( b ) (b)->ent /* currently other bits arent abused */
143         unsigned short stk; /* 13 bits offset of keyspace stack */
144 /* offset of dict RELATIVE to base */
145 #define LBT_DOFF ((char *)&((Block*)0)->dct - (char*)&((Block*)0)->num)
146         Dict           dct[1];
147 }       Block;
148
149 /*
150         LAYOUT:
151         dct is an array of dictionary entries in increasing key order
152         stk is growing downwards from block end towards dict
153         position of entry i is b->stk <= LBT_POS(b->dct+i) < bt->bsz
154         decreasing on i (i.e. stack space is in reverse ordering)
155
156         at LBT_POS(b->dct+i), there are b->dct[i].kln bytes key
157         for a leaf entry, the key is followed by
158         LBT_VLN(b->dct+i) (between 0..1023 incl.) values
159         of bt->vsz (between 8..23 incl) each
160         for an inner entry, the key is followed by
161         4 bytes block number and optionally bt->vsz qualifying value
162
163         SIZES:
164         block size >= 1024
165         block base overhead LBT_DOFF is 16 bytes
166         avail size >= 1008
167         Dict entry is 4 bytes
168         max. used stack space per entry is:
169         inner node: kln+4+bt->vsz <= 255+4+23 = 282
170         new leaf entry (one value): kln+bt->vsz <= 278
171         max total space for a new entry incl. dict is 286 < avail size/3
172 */
173
174 typedef struct Chunk {
175         struct Chunk *nxt; 
176 }       Chunk;
177
178
179 enum {
180         /* number of levels for lru list */
181         LBT_LRULEV  = sizeof(((Idx*)0)->lru)/sizeof(((Idx*)0)->lru[0]),
182         LBT_ENTBITS = 10, /* bits for # enries */
183         LBT_ENTMASK = 0x03ff,
184         LBT_ENTMAX  = LBT_ENTMASK,
185         LBT_MATCH   = 0x8000, /* entry matched */
186         LBT_ENTSIZE = 8 /* minimum cost of entry: 4 byte dict, 4 byte val */
187 };
188
189
190 typedef struct Batch {
191         Block   *cur;
192         unsigned got; /* current for this key */
193         unsigned tot; /* total for this key */
194         unsigned let;
195         /* stats */
196         unsigned keys;
197         unsigned keyl;
198         unsigned maxl;
199         unsigned vals;
200         unsigned maxv;
201         unsigned span;
202         unsigned free;
203         /* buffer */
204         Key      key;
205         Val      val[LBT_ENTMAX];
206 }       Batch;
207
208
209 typedef enum {
210         LBT_RD, /* read -- no lock */
211         LBT_WR, /* write */
212         LBT_EX /* exclusive */
213 } lbt_get;
214
215
216 typedef struct {        /* structure filled by findkey */
217         Idx           *bt;      /* the tree */
218         Key           *key; /* the key searched */
219         Block         *b[LBT_LEVELS]; /* path to leaf, b[0] is leaf */
220         unsigned       n[LBT_LEVELS]; /* block numbers of path */
221         unsigned short e[LBT_LEVELS]; /* entries in path */
222 }       FindKey;
223
224
225 typedef struct {
226         unsigned get;
227         unsigned miss;
228         unsigned cmp;
229         unsigned key;
230 }       Stat;
231
232 /* buffer to print a value to.
233         lowest 11 bytes are printed as 4 shorts + one int < 4*5+10 = 30 digits+5sep
234         higher 12 bytes are printed hex = 24 bytes.
235 */
236 typedef char PrVal[64];
237
238
239 /* ************************************************************
240         private data
241 */
242
243 /*
244         stat access is not guarded by memory barrier.
245         may be wrong when running on multiple processors
246 */
247 static Stat stat;
248
249
250 /* ************************************************************
251         private functions
252 */
253
254 #if defined( sparc )
255 static unsigned GETINT ( void *m ) /* bus error safe version of *(int*) */
256 {
257         unsigned l;
258         memcpy( &l, m, 4 );
259         return l;
260 }
261 static void SETINT ( void *m, unsigned i )
262 {
263         memcpy( m, &i, 4 );
264 }
265 #else
266 #define GETINT( m ) (*(unsigned*)(m))
267 #define SETINT( m, i ) (*(unsigned*)(m) = (i))
268 #endif
269
270 static void mkval ( Val *v, Hit *ifp )
271 {
272         unsigned char *c = v->byt;
273         switch ( v->len ) {
274         default: memset( c, 0, v->len-11 ); c += v->len-11;
275         case 11: *c++ = (unsigned char) (ifp->dbn >> 8);
276         case 10: *c++ = (unsigned char)  ifp->dbn;
277         case 9:  *c++ = (unsigned char) (ifp->mfn >> 24);
278         case 8:  *c++ = (unsigned char) (ifp->mfn >> 16);
279         case 7:  *c++ = (unsigned char) (ifp->mfn >> 8);
280         case 6:  *c++ = (unsigned char)  ifp->mfn;
281         case 5:  *c++ = (unsigned char) (ifp->tag >> 8);
282         case 4:  *c++ = (unsigned char)  ifp->tag;
283         case 3:  *c++ = (unsigned char)  ifp->occ;
284         case 2:  *c++ = (unsigned char) (ifp->pos >> 8);
285         case 1:  *c++ = (unsigned char)  ifp->pos;
286         case 0:  ;
287         }
288 }       /* mkval */
289
290
291 static Hit *rdval ( Hit *ifp, unsigned char *c, unsigned vsz )
292 {
293         ifp->mfn = ifp->tag = ifp->occ = ifp->pos = ifp->dbn = 0;
294         switch ( vsz ) {
295         default: c += vsz-11;
296         case 11: ifp->dbn  = (unsigned short)*c++ << 8;
297         case 10: ifp->dbn |=                 *c++;
298         case 9:  ifp->mfn  = (unsigned  int)*c++ << 24;
299         case 8:  ifp->mfn |= (unsigned  int)*c++ << 16;
300         case 7:  ifp->mfn |= (unsigned  int)*c++ << 8;
301         case 6:  ifp->mfn |=                 *c++;
302         case 5:  ifp->tag  = (unsigned short)*c++ << 8;
303         case 4:  ifp->tag |=                 *c++;
304         case 3:  ifp->occ  =                 *c++;
305         case 2:  ifp->pos  = (unsigned short)*c++ << 8;
306         case 1:  ifp->pos |=                 *c++;
307         case 0:  ;
308         }
309         return ifp;
310 }       /* rdval */
311
312
313 static char *prval ( PrVal buf, unsigned char *c, unsigned vsz )
314 {
315         char *d = buf;
316         Hit h;
317         if ( ! vsz ) {
318                 buf[0] = '#';
319                 buf[1] = 0;
320                 return buf;
321         }
322         if ( 23 < vsz )
323                 vsz = 23;
324         if ( 11 < vsz ) {
325                 while ( 11 < vsz-- ) {
326                         *d++ = luti_hex[ *c >> 4 ];
327                         *d++ = luti_hex[ *c++ & 0xf ];
328                 }
329                 *d++ = '.';
330         }
331         rdval( &h, c, vsz );
332         d += u2a( d, h.dbn ); *d++ = '.';
333         d += u2a( d, h.mfn ); *d++ = '.';
334         d += u2a( d, h.tag ); *d++ = '.';
335         d += u2a( d, h.occ ); *d++ = '.';
336         d += u2a( d, h.pos ); *d++ = '.';
337         *d = 0;
338         return buf;
339 }       /* prval */
340
341 /* for PRVAL f->key->val */
342 #define PRVAL( buf, v ) prval( buf, v.byt, v.len )
343
344
345 /* ************************************************************
346         block I/O & cache mgmt
347 */
348
349 static void iniblk ( Block *b, Idx *bt, unsigned num, unsigned char lev )
350 {
351         memset( b, 0, LBT_BOFF + bt->bsz + LBT_GUARD );
352         b->num = num;
353         b->typ = bt->typ;
354         b->key = bt->key;
355         b->col = bt->col;
356         b->lev = lev;
357         b->stk = bt->bsz;
358 }       /* iniblk */
359
360
361 static int chkblk ( Block *b, Idx *bt )
362 {
363 #if 0
364         (void)b; (void)bt;
365 #else
366         int i;
367         unsigned short eod = LBT_DOFF + b->ent*sizeof(Dict), siz, pos, vln, key;
368         unsigned end = bt->bsz, tot = 0;
369         unsigned char *base = LBT_BASE(b);
370 #if LBT_GUARD
371         if ( memcmp( guard, b->grd, LBT_GUARD ) )
372                 return log_msg( ERR_TRASH, "OOPS! initial guard frobbed on blk %d", b->num );
373         if ( memcmp( guard, LBT_BASE(b)+bt->bsz, LBT_GUARD ) )
374                 return log_msg( ERR_TRASH, "OOPS! initial guard frobbed on blk %d", b->num );
375 #endif
376         if ( b->num > bt->len || (b->num && b->nxt > bt->len) )
377                 return log_msg( ERR_TRASH,
378                         "bad num %d / nxt %d on lev %d", b->num, b->nxt, b->lev );
379         if ( b->typ != bt->typ || b->key != bt->key || b->col != bt->col )
380                 return log_msg( ERR_TRASH,
381                         "bad sig 0x%02x,%d,%d in block %d want 0x%02x,%d,%d",
382                         b->typ, b->key, b->col, b->num, bt->typ, bt->key, bt->col );
383         if ( b->ent > bt->bsz / LBT_ENTSIZE || b->stk > bt->bsz || b->stk < eod )
384                 return log_msg( ERR_TRASH,
385                         "bad ent/stk %d,%d in block %d eod %d sz %d",
386                         b->ent, b->stk, b->num, eod, bt->bsz );
387         for ( i=0; i<b->ent; i++ ) {
388                 Dict *d = &b->dct[i];
389                 pos = LBT_POS( d );
390                 vln = LBT_VLN( d );
391                 key = d->kln;
392                 siz = key + (b->lev ? vln+4u : vln * bt->vsz);
393                 if ( pos < b->stk || end != (unsigned)pos + siz )
394                         return log_msg( ERR_TRASH,
395                                 "bad pos/vln/key %d/%d/%d in entry %d of block %d"
396                                 " stack %d sz %d end %d expected %d",
397                                 pos, vln, key, i, b->num, b->stk, siz, pos+siz, end );
398                 if ( b->lev ) {
399                         unsigned lnk;
400                         if ( vln && vln != +bt->vsz )
401                                 return log_msg( ERR_TRASH,
402                                         "bad vln %d in entry %d of block %d vsz %d",
403                                         vln, i, b->num, bt->vsz );
404                         lnk = GETINT( base + pos + key + vln );
405                         if ( lnk >= bt->len )
406                                 return log_msg( ERR_TRASH,
407                                         "bad link %u in entry %d of block %d",
408                                         lnk, i, b->num );
409                 }
410 #if 1 && ! defined( NDEBUG )
411                 if ( i ) {
412                         unsigned short op = LBT_POS( d-1 ), okl = d[-1].kln;
413                         int mc = memcmp( base+pos, base+op, key<okl ? key : okl ), j;
414                         if ( 0 < mc || (!mc && 0 < (mc = (int)key - (int)okl)) )
415                                 goto ok;
416                         j = 0;
417                         if ( !mc && b->lev && vln ) { /* compare values */
418                                 unsigned ovln = d[-1].vln;
419                                 if ( ! ovln || 0 < (j = memcmp( base+pos+key, base+op+okl, vln )) )
420                                         goto ok;
421                         }
422                         return log_msg( ERR_TRASH,
423                                 "bad key order entry %d of block %d lev %d"
424                                 " mc %d j %d key %d okl %d vln %d",
425                                 i, b->num, b->lev, mc, j, key, okl, vln
426                         );
427                 }
428         ok:
429 #endif
430                 tot += siz;
431                 end = pos;
432         }
433         if ( tot > bt->bsz - b->stk )
434                 return log_msg( ERR_TRASH,
435                         "bad total entry size %d in block %d stack %d",
436                         tot, b->num, b->stk );
437 #ifndef NDEBUG
438         if ( LOG_DO( LOG_TRACE ) ) {
439                 log_msg( LOG_TRACE, 
440                         "block %d lev %d sig 0x%02x,%d,%d ent %d eod %d stk %d",
441                         b->num, b->lev, b->typ, b->key, b->col, b->ent, eod, b->stk
442                 );
443                 for ( i=0; i<2; i++ ) {
444                         int j = i ? b->ent-1 : 0;
445                         Dict *d = &b->dct[j];
446                         if ( i >= b->ent ) /* if b->ent is 0 or 1, log no or one key */
447                                 break;
448                         pos = LBT_POS( d );
449                         vln = LBT_VLN( d );
450                         key = d->kln;
451                         end = pos + b->lev ? vln+4u : vln * bt->vsz;
452                         /* start of value */
453                         if ( b->lev )
454                                 log_msg( LOG_TRACE, "key %d '%.*s'/%d -> %d",
455                                         j, key, base+pos, vln, GETINT(base+pos+key+vln) );
456                         else {
457                                 Hit h;
458                                 rdval( &h, base+pos+key, bt->vsz );
459                                 log_msg( LOG_TRACE, "key %d '%.*s'[%d] = %d[%d,%d,%d]",
460                                         j, key, base+pos, vln, h.mfn, h.tag, h.occ, h.pos );
461                         }
462                 }
463         }
464 #endif
465 #endif
466         return 0;
467 }       /* chkblk */
468
469
470 static Block *addchunk ( Idx *bt, unsigned cnt )
471 {
472         unsigned ibsz = LBT_BOFF + bt->bsz + LBT_GUARD; /* internal block size */
473         /* increase cnt so it's > 0 and add one extra to return */
474         char *mem = (char *)mAlloc( sizeof(Chunk) + (1 + ++cnt)*ibsz );
475         Chunk *chk = (Chunk *)mem;
476         if ( ! mem )
477                 return 0;
478         /* chain the chunk */
479         chk->nxt = bt->mem;
480         bt->mem = chk;
481         mem += sizeof(Chunk);
482         bt->clen += cnt;
483         /* add cnt blocks to level 0 lru list */
484         if ( ! bt->mru[0] ) /* level 0 cache is empty */
485                 bt->mru[0] = (Block *)mem;
486         for ( ; cnt--; mem += ibsz ) {
487                 Block *b = (Block *)mem;
488                 iniblk( b, bt, 0 , 0 ); /* initialise empty block */
489                 b->nru = bt->lru[0];
490                 bt->lru[0] = b;
491         }
492         iniblk( (Block *)mem, bt, 0 , 0 );
493         return (Block *)mem;
494 }       /* addchunk */
495
496
497 static int readblk ( Block *b, Idx *bt, unsigned num )
498 {
499         int got = lio_pread( &bt->fd, LBT_BASE(b), bt->bsz, num*bt->bsz );
500         log_msg( LOG_VERBOSE, "reading block %u got %u", num, got );
501         if ( bt->bsz != (unsigned)got )
502                 return log_msg( LOG_SYSERR, "\twhen reading bt block %d", num );
503         if ( b->num != num )
504                 return log_msg( ERR_TRASH, "bad num %d in block %d", b->num, num );
505         return chkblk( b, bt );
506 } /* readblk */
507
508
509 static void unlock ( Block *b )
510 {
511         b->lck = 0;
512         if ( b->wlk )
513                 (void)LIO_WAKE( b->num );
514 }
515
516 #define ULK( b ) do { \
517         if ( (b)->lck ) { (b)->lck = 0; if ( b->wlk ) unlock( b ); } \
518         } while (0)
519
520
521 static int writeblk ( Block *b, Idx *bt, int ulk )
522 {
523         int got;
524         if ( chkblk( b, bt ) ) /* don't write no shize */
525                 return log_msg( ERR_TRASH, "\tbefore writing bt block %d", b->num );
526         got = lio_pwrite( &bt->fd, LBT_BASE(b), bt->bsz, b->num*bt->bsz );
527         log_msg( LOG_VERBOSE, "writing block %u got %u", b->num, got );
528         if ( ulk )
529                 ULK( b );
530         if ( bt->bsz != (unsigned)got )
531                 return log_msg( LOG_SYSERR, "\twhen writing bt block %d", b->num );
532         return 0;
533 } /* writeblk */
534
535 #define WRULK( b, bt ) writeblk( b, bt, 1 )
536
537
538 #define NEWBLK( bt, lev, lock ) getblk( bt, bt->len, lev, lock )
539 /**
540         get block #num.
541         1) locate block #num in cache.
542         2) if it is found:
543                 if it is READING, or we want to lock and it is locked,
544                 incr # waiters, wait for it.
545         3) if it is not found:
546                 locate a free cache entry, extending the cache if necessary,
547                 set the num, set the READING flag, go read it.
548                 after reading, if there are waiters, wake them up.
549                 since we are the first to get a hold on the block,
550                 we may write lock it, if we want.
551         4) return the block.
552 */
553 static Block *getblk ( Idx *bt, unsigned num,
554         unsigned char lev, lbt_get lock )
555 {
556         Block *b;
557         stat.get++; /* dirty */
558         if ( !(stat.get & 0x3fff) ) { /* every 16K */
559                 static unsigned lmiss;
560                 log_msg( LOG_INFO,
561                         "searched %i keys in %u blks cache %d %d%%"
562                         " depth %u cmp/get/miss %u/%u/%u %d%% (%d %d%%)",
563                         stat.key, bt->len, bt->clen, bt->clen*100 / bt->len,
564                         bt->root->lev, stat.cmp, stat.get,
565                         stat.miss, stat.miss*100 / stat.get,
566                         (stat.miss - lmiss), (stat.miss - lmiss)*100 / 0x4000
567                 );
568                 lmiss = stat.miss;
569         }
570 startover: /* only used on absurd congestion */
571         b = 0;
572         LOG_DBG( LOG_DEBUG, "get bt block %u %u %d", num, lev, lock );
573         if ( ! num )
574                 b = bt->root;
575         else if ( num < bt->len ) {
576                 b = bt->hash[ num % bt->hlen ];
577                 while ( b && num != b->num )
578                         b = b->nhs;
579         }
580         if ( b )
581                 while ( b->ird || (lock && b->lck) ) {
582                         int wrd = b->ird;
583                         int wlk = lock && b->lck;
584                         LOG_DBG( LOG_VERBOSE, "lck on block %u %u r%d l%d",
585                                 num, lev, b->ird, b->lck );
586                         if ( LBT_EX <= lock ) {
587                                 log_msg( LOG_ERROR, "concurrent mod on EX lock" );
588                                 return 0;
589                         }
590                         if ( ! lio_lock ) {
591                                 log_msg( LOG_FATAL, "bad MT environment: concurrent mod, no lock" );
592                                 return 0;
593                         }
594                         if ( wrd && 0xff != b->wrd )
595                                 b->wrd++;
596                         if ( wlk && 0xff != b->wlk )
597                                 b->wlk++;
598                         (void)LIO_WAIT( num ); /* zzzzz .... sleep tight */
599                         /* some hours later */
600                         if ( num != b->num ) {
601                                 log_msg( LOG_WARN, "heavy congestion: block %d has gone", num );
602                                 goto startover;
603                         }
604                         if ( wrd && b->wrd )
605                                 b->wrd--;
606                         if ( wlk && b->wlk )
607                                 b->wlk--;
608                 }
609         else { /* not found or new block */
610                 /* find free block */
611                 int i = 0, llev, err = 0;
612                 stat.miss++; /* dirty */
613                 LOG_DBG( LOG_DEBUG, "bt block %u %u not found", num, lev );
614                 for ( ; i < (int)LBT_LRULEV; i++ ) {
615                         /* check lru list for level i */
616                         Block **bp = &bt->lru[i];
617                         while ( (b = *bp) && LBT_BUSY(b) )
618                                 bp = &b->nru;
619                         if ( ! b ) /* try next level */
620                                 continue;
621                         *bp = b->nru; /* dequeue */
622                         if ( bt->mru[i] == b ) /* was the tail */
623                                 bt->mru[i] = 0;
624                         break;
625                 }
626                 if ( ! b ) {
627                         log_msg( LOG_WARN, "no free blocks in cache" );
628                         if ( !(b = addchunk( bt, 64 )) )
629                                 return 0;
630                         /* TODO: should we also extend the hash ? */
631                 } else if ( b->num ) { /* got block -- should be hashed somewhere */
632                         Block **bp = &bt->hash[ b->num % bt->hlen ];
633                         while ( *bp && *bp != b )
634                                 bp = &(*bp)->nhs;
635                         if ( *bp == b )
636                                 *bp = b->nhs; /* dehash */
637                         /* else ... hmmm should have found it */
638                 }
639                 assert( ! LBT_BUSY( b ) );
640                 iniblk( b, bt, num, lev );
641                 /* hash it, so waiters can find it */
642                 b->nhs = bt->hash[ b->num % bt->hlen ];
643                 bt->hash[ b->num % bt->hlen ] = b;
644                 if ( num == bt->len ) { /* extend */
645                         b->lev = lev;
646                         bt->len++;
647                         /* TODO: should be configurable */
648                         if ( bt->len > bt->clen << 5 && bt->clen < 0x4000 )
649                                 addchunk( bt, bt->clen / 4 );
650                 } else {
651                         b->ird = 1;
652                         (void)LIO_RELE();
653                         err = readblk( b, bt, num );
654                         (void)LIO_LOCK();
655                         b->ird = 0;
656                         if ( ! err && lev != b->lev )
657                                 err = log_msg( ERR_TRASH, "bad lev %d in block %d want %d",
658                                         b->lev, b->num, lev );
659                         if ( err ) {
660                                 Block **bp = &bt->hash[ num % bt->hlen ];
661                                 while ( *bp && *bp != b )
662                                         bp = &(*bp)->nhs;
663                                 if ( *bp == b )
664                                         *bp = b->nhs; /* dehash */
665                                 b->lev = 0; /* add to level 0 lru list */
666                                 b->num = 0; /* tell others that it isn't the expected block */
667                         }
668                 }
669                 /* add to it's levels lru list */
670                 llev = LBT_LRULEV > b->lev ? b->lev : LBT_LRULEV-1;
671                 if ( bt->mru[ llev ] )
672                         bt->mru[ llev ]->nru = b;
673                 else 
674                         bt->lru[ llev ] = b;
675                 (bt->mru[ llev ] = b)->nru = 0;
676                 if ( b->wrd )
677                         (void)LIO_WAKE( num );
678                 if ( err ) {
679                         b->wrd = b->wlk = 0; /* mark block unused */
680                         return 0;
681                 }
682         }       /* got free block */
683         if ( LBT_WR == lock ) {
684                 if ( b->lck ) {
685                         log_msg( LOG_FATAL, "bt consi: unexpected lock on %u", num );
686                         return 0;
687                 }
688                 b->lck = 1;
689         }
690         return b;
691 }       /* getblk */
692
693
694 /** binsearch position for key.
695         position is either the index of the matching entry (0..b->ent-1)
696         or the index (0..b->ent) where key should be inserted.
697         @return position for key (0..b->ent), |ifyed with LBT_MATCH, if found
698 */
699 static unsigned short keypos (
700 #if LBT_USECMP
701         Idx *bt,
702 #define KEYPOS( bt, b, key ) keypos( bt, b, key )
703 #else
704 #define KEYPOS( bt, b, key ) keypos( b, key )
705 #endif
706         Block *b, Key *key )
707 {
708         unsigned char *base = LBT_BASE( b );
709         unsigned char *kbt = key->byt;
710         unsigned char  kln = key->len;
711         unsigned short l = 0, r = b->ent;
712         Dict          *d = b->dct;
713         /* binsearch invariant:
714                 key is < all entries i >= r (we do know relation at r)
715                 key is > all entries i < l (don't know relation at l)
716         */
717         while ( l < r ) { /* invariant: l <= keypos < r */
718                 unsigned short i = (l + r) >> 1; /* l <= i < r */
719                 unsigned short pos = LBT_POS(&d[i]);
720                 unsigned char cln = d[i].kln;
721                 int cmp =
722 #if LBT_USECMP
723                         bt->cmp ? bt->cmp( kbt, base+pos, kln<cln ? kln : cln ) :
724 #endif
725                         memcmp( kbt, base+pos, kln<cln ? kln : cln );
726                 stat.cmp++; /* dirty */
727                 if ( ! cmp && !(cmp = (int)kln - cln) ) {
728                         /* match on main key -- what about the value ? */
729                         if ( ! b->lev /* ignore on leaves -- no duplicate keys here */
730                                 || (!d[i].vln && !key->val.len) /* no value */
731                         )
732                                 return LBT_MATCH | i;
733                         /* so we're on an inner node */
734                         if ( ! d[i].vln ) /* common case */
735                                 cmp = 1;
736                         else if ( ! key->val.len )
737                                 cmp = -1;
738                         else {
739                                 unsigned char vln = LBT_VLN(&d[i]);
740                                 int c = key->val.len < d[i].vln ? key->val.len : vln;
741                                 if ( !(cmp = memcmp( key->val.byt, base+pos+cln, c ))
742                                         && !(cmp = (int) key->val.len - vln)
743                                 )
744                                 return LBT_MATCH | i;
745                         }
746                 }
747                 if ( cmp > 0 )
748                         l = i+1;
749                 else
750                         r = i;
751         }
752         return l; /* == r */
753 }       /* keypos */
754
755
756 /** find position for (leaf) val.
757         search v in array b of n values of size sz
758         @return position for value (0..n), |ifyed with LBT_MATCH, if found
759 */
760 static unsigned short valpos ( unsigned char *b,
761         unsigned short n, unsigned char sz, unsigned char *v )
762 {
763         unsigned short l = 0, r = n;
764         /* binsearch invariant:
765                 v is < all values i >= r (we do know relation at r)
766                 v is > all values i < l (don't know relation at l)
767         */
768         while ( l < r ) { /* invariant: l <= valpos < r */
769                 unsigned short i = (l + r) >> 1; /* l <= i < r */
770                 int cmp = memcmp( v, b+i*sz, sz );
771                 if ( ! cmp )
772                         return LBT_MATCH | i;
773                 if ( cmp > 0 )
774                         l = i+1;
775                 else
776                         r = i;
777         }
778         return l; /* == r */
779 }       /* valpos */
780
781
782 #ifndef NDEBUG
783 static void TRACEKEY ( unsigned lev, FindKey *f )
784 {
785         int i;
786         unsigned n = 0;
787         Hit h;
788         PrVal pv;
789
790         if ( LOG_DONT( lev ) )
791                 return;
792         log_msg( lev, "key '%.*s' vln %d val %s",
793                 f->key->len, f->key->byt, f->key->val.len,
794                 PRVAL( pv, f->key->val ) );
795         if ( f->key->val.len ) {
796                 rdval( &h, f->key->val.byt, f->key->val.len );
797                 log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d", 
798                         h.dbn, h.mfn, h.tag, h.occ, h.pos );
799         }
800         for ( i=LBT_LEVELS; i--; ) {
801                 Block *b = f->b[i];
802                 unsigned on = n, e;
803                 Dict *d;
804                 if ( ! b  )
805                         continue;
806                 d = b->dct + (e = (LBT_ENTMASK & f->e[i]));
807                 if ( i )
808                         n = GETINT( LBT_BASE(b)+d->pos+d->kln+d->vln );
809                 log_msg( lev, "lev %d n %d e 0x%x = %d %c '%.*s' val %d %s -> %d%s",
810                         i, f->n[i], f->e[i], e, (LBT_MATCH & f->e[i]) ? 'M' : '-',
811                         d->kln, LBT_BASE(b)+d->pos, d->vln,
812                         prval( pv, LBT_BASE(b)+d->pos+d->kln, d->vln ),
813                         n, on == b->num ? "" : " LY!"
814                 );
815                 if ( d->vln ) {
816                         rdval( &h, LBT_BASE(b)+d->pos+d->kln, d->vln );
817                         log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d", 
818                                 h.dbn, h.mfn, h.tag, h.occ, h.pos );
819                 }
820                 if ( e ) {
821                         d--;
822                         log_msg( lev, "\tleft '%.*s' val %d %s",
823                                 d->kln, LBT_BASE(b)+d->pos, d->vln,
824                                 prval( pv, LBT_BASE(b)+d->pos+d->kln, d->vln )
825                         );
826                         if ( d->vln ) {
827                                 rdval( &h, LBT_BASE(b)+d->pos+d->kln, d->vln );
828                                 log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d", 
829                                         h.dbn, h.mfn, h.tag, h.occ, h.pos );
830                         }
831                 }
832         }
833 }       /* TRACEKEY */
834 #else
835 #define TRACEKEY( l, f ) (void)(f)
836 #endif
837
838
839 static int findkey ( FindKey *f, lbt_get lock )
840 {
841         int i;
842         Block *b = f->bt->root;
843         unsigned char *pkey = 0, pkln = 0, pvln = 0;
844         if ( b->lev >= LBT_LEVELS )
845                 return log_msg( ERR_TRASH, "tree is too deep: %u", b->lev );
846         stat.key++;
847         for ( i=LBT_LEVELS; i--; ) { /* paranoia */
848                 f->b[i] = 0;
849                 f->n[i] = f->e[i] = 0;
850         }
851         for ( i=b->lev;; ) {
852                 unsigned short e;
853                 unsigned  n;
854                 Dict *d;
855 #ifndef NDEBUG
856                 chkblk( b, f->bt );
857 #endif
858                 f->n[i] = (f->b[i] = b)->num;
859                 f->e[i] = e = KEYPOS( f->bt, b, f->key );
860                 LOG_DBG( LOG_DEBUG, "keypos %x", e );
861 #ifndef NDEBUG
862                 if ( pkey && b->ent ) {
863                         /* compare parent key against our first key
864                                 pkey must be equal for inner node and not greater for leave
865                         */
866                         int cmp; /* this - pkey >= 0 */
867                         unsigned char *ent, ln;
868                         d = b->dct;
869                         ent = LBT_BASE(b)+LBT_POS(d);
870                         ln = pkln < d->kln ? pkln : d->kln;
871                         cmp = ln ? memcmp( ent, pkey, ln ) : 0;
872                         if ( ! cmp )
873                                 cmp = (int)d->kln - (int)pkln;
874                         if ( ! cmp ) {
875                                 if ( ! d->vln || ! pvln )
876                                         cmp = (int)d->vln - (int)pvln;
877                                 else
878                                         cmp = memcmp( ent+d->kln, pkey+pkln, f->bt->vsz );
879                         }
880                         if ( cmp && (i || 0 > cmp) ) {
881                                 Hit h;
882                                 PrVal pv;
883                                 TRACEKEY( ERR_TRASH, f );
884                                 if ( d->vln ) {
885                                         rdval( &h, ent+d->kln, d->vln );
886                                         log_msg( ERR_TRASH, "\tfirst val db %d mfn %d tag %d occ %d pos %d", 
887                                                 h.dbn, h.mfn, h.tag, h.occ, h.pos );
888                                 }
889                                 return log_msg( ERR_TRASH, "\tbad first '%.*s' val %d %s",
890                                         d->kln, ent, d->vln, prval( pv, ent+d->kln, d->vln ) );
891                         }
892                 }
893 #endif
894                 /*
895                         here goes the Lehmann/Yao race detection:
896                         If we ran on right boundary, follow right link
897                 */
898                 while ( !(LBT_MATCH & e) && b->ent == e && b->nxt && b->num ) {
899                         Block *s;
900                         unsigned se;
901                         b->lck = LBT_WR; /* hold b while checking s */
902                         s = getblk( f->bt, b->nxt, i, i ? LBT_RD : lock );
903                         if ( i || ! lock )
904                                 ULK( b );
905                         else
906                                 b->lck = lock;
907                         se = KEYPOS( f->bt, s, f->key );
908                         if ( !se ) { /* lower than first */
909                                 ULK( s );
910                                 break;
911                         }
912                         LOG_DBG( LOG_DEBUG, "LYao %d -> %d lev %d for '%.*s' got 0x%x",
913                                 b->num, s->num, i, f->key->len, f->key->byt, se );
914                         /* trade b for s */
915                         ULK( b );
916                         f->n[i] = (f->b[i] = b = s)->num;
917                         e = se;
918                 }
919                 f->e[i] = e;
920                 if ( ! i )
921                         return 0;
922                 if ( !(LBT_MATCH & e) ) {
923                         if ( e )
924                                 f->e[i] = --e;
925                         else {
926                                 TRACEKEY( ERR_TRASH, f );
927                                 return log_msg( ERR_IDIOT,
928                                         "ran on left bound of block %u ent %u",
929                                         b->num, b->ent );
930                         }
931                 }
932                 d = b->dct + (LBT_ENTMASK & e);
933                 /* direct access ok in inner node */
934                 pkey = LBT_BASE(b)+d->pos;
935                 pkln = d->kln;
936                 pvln = d->vln;
937                 n = GETINT( pkey+pkln+pvln );
938                 i--;
939                 b = getblk( f->bt, n, i, i ? LBT_RD : lock );
940         }
941         return -1; /* go away */
942 }       /* findkey */
943
944
945 static int mktree ( Idx *bt )
946 {
947         Batch *ba = bt->bat;
948         unsigned char lev = 0; /* the level to be indexed */
949         unsigned left = 1; /* block on child level */
950         /* stats */
951         unsigned size = bt->len * bt->bsz;
952         unsigned sval = ba->vals * bt->vsz;
953         unsigned sove = bt->len * LBT_DOFF + sizeof(Dict)*(ba->keys+ba->span);
954         unsigned iblk = 0;
955         unsigned ikey = 0;
956         unsigned ikeyl = 0;
957         unsigned ipfx = 0;
958         unsigned ispn = 0;
959
960         log_msg( LOG_INFO,
961                 "end batch:\n"
962                 "\t#blks %u total len %u free %u (%.1f%%) overhead %u (%.1f%%)\n"
963                 "\t#keys %u total len %u (%.1f%%) avg %.1f max %u\n"
964                 "\t#vals %u total len %u (%.1f%%) avg %.1f max %u spans %u",
965                 bt->len, size, ba->free, ba->free*100./size, sove, sove*100./size,
966                 ba->keys, ba->keyl, ba->keyl*100./size, (float)ba->keyl/ba->keys, ba->maxl,
967                 ba->vals, sval, sval*100./size, (float)ba->vals/ba->keys, ba->maxv, ba->span
968         );
969
970         /* build tree */
971         for (;;) { /* levels */
972                 unsigned  len = 0; /* current is n'th block on level */
973                 Block        *b = 0;
974                 unsigned  have = 0; /* room in current block b */
975                 unsigned char*base = 0; /* of current block b */
976                 Dict         *d = 0; /* of current block b */
977                 unsigned next = left; /* child */
978
979                 left = bt->len;
980                 /* get child c and create child pointer in block b */
981                 while ( next ) {
982                         unsigned char  vln = 0;
983                         unsigned char *key;
984                         unsigned char  kln;
985                         unsigned  need;
986                         Block *c = getblk( bt, next, lev, LBT_EX );
987                         if ( ! c )
988                                 return log_msg( ERR_NOMEM, "no child block" );
989                         next = c->nxt;
990                         if ( ! c->ent ) /* OOPS empty node */
991                                 continue;
992                         key = LBT_BASE(c)+LBT_POS(c->dct);
993                         kln = c->dct[0].kln;
994                         if ( ! b ) /* the leftmost child on this level */
995                                 kln = 0; /* always 0 key on left */
996                         else if ( lev )
997                                 vln = LBT_VLN(c->dct);
998                         else if ( ba->key.len == kln /* compare key to last of prev. */
999                                 && !memcmp( ba->key.byt, key, kln ) /* which was saved in ba->key */
1000                         ) { /* key spans blocks */
1001                                 LOG_DBG( LOG_DEBUG, "found span key '%.*s' child %u",
1002                                         kln, key, c->num );
1003                                 vln = bt->vsz; /* qualify with value */
1004                         } else {
1005 #if 1
1006                                 unsigned char x = 0;
1007                                 /* find prefix key */
1008                                 if ( kln > ba->key.len )
1009                                         kln = ba->key.len;
1010                                 while ( x < kln && key[x] == ba->key.byt[x] )
1011                                         x++;
1012                                 if ( x < kln ) /* they differ at [x] */
1013                                         kln = x+1;
1014                                 else /* equal on the shorter length */
1015                                         kln++;
1016                                 if ( kln > c->dct[0].kln )
1017                                         return log_msg( ERR_TRASH,
1018                                                 "bad key '%.*s' on level %u matching previous of length %u",
1019                                                 c->dct[0].kln, key, lev, ba->key.len );
1020                                 LOG_DBG( LOG_DEBUG, "prefix on key '%.*s' child %u save %u",
1021                                         kln, key, c->num, c->dct[0].kln - kln );
1022                                 ipfx += c->dct[0].kln - kln;
1023 #endif
1024                         }
1025                         if ( vln )
1026                                 ispn++;
1027                         ikeyl += kln;
1028                         need = sizeof(Dict) + kln + vln + 4;
1029                         if ( need > have ) {
1030                                 if ( b ) {
1031                                         log_msg( LOG_VERBOSE,
1032                                                 "level %u block %u is full %u ent stack %u",
1033                                                 lev, b->num, b->ent, b->stk );
1034                                         b->nxt = bt->len; /* link */
1035                                         b->lck = 0;
1036                                         if ( WRULK( b, bt ) ) goto nowrite;
1037                                         len++;
1038                                 }
1039                                 c->lck = LBT_EX; /* paranoia */
1040                                 b = NEWBLK( bt, lev+1, LBT_EX );
1041                                 c->lck = 0;
1042                                 if ( ! b )
1043                                         return log_msg( ERR_NOMEM, "no free block" );
1044                                 if ( b->ent || b->stk != bt->bsz )
1045                                         return log_msg( ERR_TRASH,
1046                                                 "new block %u is not empty: ent %u stk %u",
1047                                                 b->num, b->ent, b->stk );
1048                                 base = LBT_BASE( b );
1049                                 b->lck = LBT_EX; /* lock */
1050                                 have = bt->bsz - LBT_DOFF; /* - ba->let; TODO: should we ??? */
1051                                 d = b->dct;
1052                                 len++;
1053                                 iblk++;
1054                         }
1055                         SETINT( base+(b->stk -= 4), c->num );
1056                         d->kln = kln;
1057                         d->vln = vln;
1058                         if ( kln += vln ) /* key + consecutive qualifying value */
1059                                 memcpy( base+(b->stk -= kln), key, kln );
1060                         d->pos = b->stk;
1061                         d++;
1062                         b->ent++;
1063                         ikey++;
1064                         have -= need;
1065                         /* save the last dance for me */
1066                         memcpy( ba->key.byt, LBT_BASE(c)+LBT_POS(&c->dct[c->ent-1]),
1067                                 ba->key.len = c->dct[c->ent-1].kln );
1068                 }
1069                 if ( !len )
1070                         return log_msg( ERR_TRASH, "level %u was empty", lev );
1071                 /* close level */
1072                 log_msg( LOG_INFO, "closing level %u with %u blocks %u..%u",
1073                         lev, len, left, b->num );
1074                 b->lck = 0;
1075                 if ( 1 == len ) {
1076                         memcpy( bt->root, b, LBT_BOFF + bt->bsz );
1077                         b = bt->root;
1078                         b->num = 0;
1079                 }
1080                 if ( WRULK( b, bt ) ) goto nowrite;
1081                 lev++;
1082                 if ( 2 > len )
1083                         break;
1084         }
1085         log_msg( LOG_INFO,
1086                 "\tused %u inner blocks %u keys %u bytes avg %.1f\n"
1087                 "\t%u #spans (%.1f%%) %u pfx (- %.1f%%)",
1088                 iblk, ikey, ikeyl, (float)ikeyl/ikey,
1089                 ispn, ispn*100.*bt->vsz/ikeyl, ipfx, ipfx*100./(ipfx+ikeyl) );
1090
1091         mFree( bt->bat );
1092         bt->bat = 0;
1093         return 0;
1094 nowrite:
1095         return log_msg( LOG_SYSERR, "\twhen writing batch block" );
1096 }       /* mktree */
1097
1098
1099
1100 /* ************************************************************
1101         package data
1102 */
1103
1104
1105
1106 /* ************************************************************
1107         package functions
1108 */
1109
1110 int lbt_batch ( Idx *bt, unsigned char pctfree )
1111 {
1112         bt->bat = (Batch*)mAlloc( sizeof(Batch) );
1113         if ( ! bt->bat )
1114                 return log_msg( ERR_NOMEM, "\twhen getting batch" );
1115         memset( bt->bat, 0, sizeof(Batch) );
1116         if ( pctfree > 50 )
1117                 pctfree = 50;
1118         bt->bat->let = pctfree * bt->bsz / 100;
1119         lio_trunc( &bt->fd, 0 );
1120         iniblk( bt->root, bt, 0, 0 );
1121         /* TODO: kill hash */
1122         bt->len = 1;
1123         bt->bat->cur = NEWBLK( bt, 0, LBT_EX );
1124         return 0;
1125 }       /* lbt_batch */
1126
1127
1128 int lbt_batchval ( Idx *bt, Key *key )
1129 {
1130         int cmp = 0;
1131         Batch *ba = bt->bat;
1132         if ( key != &ba->key ) {
1133                 int l = key->len < ba->key.len ? key->len : ba->key.len; /* min */
1134                 if ( ! (cmp = memcmp(key->byt, ba->key.byt, l)) )
1135                         cmp = (int)key->len - (int)ba->key.len;
1136                 if ( 0 > cmp )
1137                         return log_msg( LOG_ERROR, "batch key '%.*s' < last '%.*s'",
1138                                 key->len, key->byt, ba->key.len, ba->key.byt );
1139         }
1140         /* flush ? */
1141         while ( ! key->val.len /* end */
1142                 || LBT_ENTMAX == ba->got /* full */
1143                 || cmp /* next key */
1144         ) {
1145                 Block *b = ba->cur;
1146                 unsigned have = b->stk - LBT_DOFF - b->ent*sizeof(Dict) - ba->let;
1147                 unsigned need = sizeof(Dict) + ba->key.len;
1148                 unsigned want = need + ba->got * bt->vsz;
1149                 unsigned take = ba->got;
1150                 if ( have < want ) { /* you can't always get ... */
1151                         if ( have <= need )
1152                                 take = 0;
1153                         else {
1154                                 take = (have - need) / bt->vsz;
1155                                 if ( 256 > take * bt->vsz )
1156                                         take = 0;
1157                         }
1158                 }
1159 #if 0 /* prefix key turbo ??? -- doesn't seem to work all too well */
1160                 if ( have < (bt->bsz >> 1) ) { /* ready to waste half a block ? */
1161                         Dict *d = &b->dct[ b->ent-1 ];
1162                         unsigned char *last = LBT_BASE(b)+LBT_POS(d);
1163                         if ( ba->key.byt[0] != last[0]
1164                                 || ba->key.byt[1] != last[1]
1165                                 || ba->key.byt[2] != last[2]
1166                         )
1167                                 take = 0;
1168                 }
1169 #endif
1170                 LOG_DBG( LOG_DEBUG, "batch '%.*s' got %u need %u want %u have %u take %u",
1171                         ba->key.len, ba->key.byt, ba->got, need, want, have, take );
1172                 /* we have enough room in b for take entries */
1173                 if ( take ) {
1174                         unsigned char *base = LBT_BASE( b );
1175                         unsigned char *s = base + b->stk;
1176                         int i = take;
1177                         Dict *d = &b->dct[ b->ent++ ];
1178                         while ( i-- )
1179                                 memcpy( s -= bt->vsz, ba->val[i].byt, bt->vsz );
1180                         memcpy( s -= ba->key.len, ba->key.byt, ba->key.len );
1181                         b->stk = (unsigned short) (s - base);
1182                         LBT_PV( d, b->stk, take );
1183                         d->kln = ba->key.len;
1184                         if ( ba->got > take )
1185                                 memcpy( ba->val, ba->val+take, sizeof(ba->val[0])*(ba->got - take) );
1186                         ba->got -= take;
1187                 }
1188                 if ( ! ba->got )
1189                         break;
1190                 if ( take ) /* key spans blocks */
1191                         ba->span++;
1192                 ba->free += b->stk - LBT_DOFF - b->ent*sizeof(Dict); /* wasted */
1193                 b->nxt = bt->len; /* link */
1194                 LOG_DBG( LOG_DEBUG, "batch: %u entries left, getting block %u",
1195                         ba->got, bt->len );
1196                 if ( WRULK( b, bt ) )
1197                         return log_msg( LOG_SYSERR, "\twhen writing batch block" );
1198                 /* so couldn't flush all, thus we need another block */
1199                 b = ba->cur = NEWBLK( bt, 0, LBT_EX );
1200                 /* will continue on end or keychange */
1201                 if ( b->ent || b->stk < bt->bsz )
1202                         return log_msg( ERR_TRASH, "new block %u is not empty: ent %u stk %u",
1203                                 b->num, b->ent, b->stk );
1204                 LOG_DBG( LOG_VERBOSE, "new block %u has %u ent %u stk, will keep %u",
1205                         b->num, b->ent, b->stk, ba->let );
1206         }
1207         if ( cmp || ! key->val.len ) /* done with key: stats */
1208                 if ( ba->maxv < ba->tot )
1209                         ba->maxv = ba->tot;
1210         if ( key->val.len ) {
1211                 if ( cmp ) {    /* set new key */
1212                         if ( ba->got )
1213                                 return log_msg( ERR_IDIOT, "batch not empty on new key" );
1214                         ba->tot = 0;
1215                         memcpy( ba->key.byt, key->byt, ba->key.len = key->len );
1216                         ba->keys++;
1217                         ba->keyl += key->len;
1218                         if ( ba->maxl < key->len )
1219                                 ba->maxl = key->len;
1220                 }
1221                 /* add new val */
1222                 /* LOG_HEX( key->val.byt, bt->vsz ); */
1223                 memcpy( ba->val[ ba->got++ ].byt, key->val.byt, bt->vsz );
1224                 ba->vals++;
1225                 ba->tot++;
1226         } else {
1227                 WRULK( ba->cur, bt );
1228                 mktree( bt );
1229         }
1230         
1231         return 0;
1232 }       /* lbt_batchval */
1233
1234
1235 /* TODO: this function is an awful mess and needs complete rewrite, KR
1236 */
1237 int lbt_add ( Idx *bt, Key *key )
1238 {
1239         int ret = 0;
1240         unsigned short e, i = 0;
1241         Block *b, *s = 0; /* mainblock, sibling */
1242         Dict *d;
1243         unsigned char *base, *sbase, *src, *v, *stk;
1244         FindKey f;
1245         int hadkey, split = 0, insibling = 0;
1246         unsigned need;
1247         PrVal pv;
1248
1249         f.bt  = bt;
1250         f.key = key;
1251         if ( bt->key && key->len > bt->key )
1252                 key->len = bt->key;
1253         if ( findkey( &f, LBT_WR ) ) {
1254                 if ( f.b[0] )
1255                         ULK( f.b[0] );
1256                 b = 0;
1257                 LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1258         }
1259         e = (LBT_ENTMASK & f.e[0]);
1260         b = f.b[0];
1261         base = LBT_BASE(b);
1262         d = b->dct + e;
1263         if ( ! (hadkey = LBT_MATCH & f.e[0]) ) /* new key */
1264                 need = key->len + sizeof(Dict) + bt->vsz;
1265         else {
1266                 unsigned vln = LBT_VLN(d);
1267                 v = base+LBT_POS(d)+d->kln;
1268                 if ( 1 == vln ) { /* check stopword -- value entirely zeroed */
1269                         int l = bt->vsz;
1270                         unsigned char *C = v;
1271                         while ( l-- && !*C++ ) /* huh huh, he said "C++" */
1272                                 ;
1273                         if ( -1 == l ) {
1274                                 LOG_DBG( LOG_VERBOSE, "key '%.*s' is a stopword", key->len, key->byt );
1275                                 goto done;
1276                         }
1277                 }
1278                 i = valpos( v, vln, bt->vsz, key->val.byt );
1279                 if ( LBT_MATCH & i ) {
1280                         LOG_DBG( LOG_DEBUG,
1281                                 "adding key '%.*s'/%s: already there pos %u",
1282                                 key->len, key->byt, PRVAL( pv, key->val ), i & ~LBT_MATCH );
1283                         goto done; /* value already there */
1284                 }
1285                 need = bt->vsz;
1286         }
1287         LOG_DBG( LOG_DEBUG,
1288                 "adding key '%.*s'/%s in block %d had %d ent %d stk %d need %d",
1289                 key->len, key->byt, PRVAL( pv, key->val ),
1290                 b->num, hadkey, b->ent, b->stk, need );
1291
1292         if ( need > b->stk - LBT_DOFF - b->ent*sizeof(Dict) ) {
1293                 /* now that's bad: not enough space
1294                         create new block s, move last entries there
1295                         starting from the end, entries (including the new one) are collected,
1296                         until their combined size reaches more than half a block.
1297                         If the combined size then is more than 2/3 a block,
1298                         and the last entry alone takes more than 1/3 a block, it is split.
1299                         TODO: check existing sibling for free space
1300                 */
1301                 unsigned half  = (bt->bsz - LBT_DOFF) >> 1;
1302                 unsigned tot   = 0;
1303                 unsigned mv = b->ent;
1304
1305                 LOG_DBG( LOG_DEBUG, "splitting block %d need %d have %d (stk %d ent %d)",
1306                         b->num, need, b->stk - LBT_DOFF - b->ent*sizeof(Dict), b->stk, b->ent );
1307                 if ( need > (bt->bsz - LBT_DOFF) / 3 )
1308                         LOG_OTO( done, (ERR_IDIOT, "OOPS! excessive need for %d bytes", need) );
1309                 /* figure out split position */
1310                 if ( !hadkey && b->ent == e ) {
1311                         tot = need;
1312                         insibling = !0;
1313                 }
1314                 d = b->dct+mv;
1315                 while ( d--, mv-- ) {
1316                         unsigned sz = sizeof(Dict)+d->kln+LBT_VLN(d)*bt->vsz;
1317                         if ( half > (tot += sz) ) { /* sz fits */
1318                                 if ( e == mv && !hadkey ) {
1319                                         /*
1320                                                 check size of new entry.
1321                                                 In case of hadkey, we don't bother for the bt->vsz bytes.
1322                                         */
1323                                         if ( half < (tot + need) ) {
1324                                                 if ( (half - tot) > need/2 ) { /* more balanced if we move */
1325                                                         tot += need;
1326                                                         insibling = !0;
1327                                                 }
1328                                                 break;
1329                                         }
1330                                         tot += need;
1331                                         insibling = !0;
1332                                 }
1333                                 if ( mv )
1334                                         continue; /* else croak on ! mv below */
1335                         }
1336                         /* exceeded half -- maybe we move entry mv anyway */
1337                         if ( tot > (bt->bsz - LBT_DOFF)*2/3 ) {
1338                                 if ( tot-sz < (bt->bsz - LBT_DOFF)/3 )
1339                                         split = !0;
1340                                 else { /* undo -- don't move this entry */
1341                                         tot -= sz;
1342                                         d++; mv++;
1343                                 }
1344                         } else if ( ! mv )
1345                                 LOG_OTO( done, (ERR_IDIOT, "OOPS! didn't find end tot %d bytes", tot) );
1346                         break;
1347                 }       /* while mv-- */
1348                 if ( mv < e || (mv == e && hadkey && !split) )
1349                         insibling = !0;
1350                 /* now mv is the first entry to move */
1351                 s = NEWBLK( bt, 0, LBT_WR );
1352                 LOG_DBG( LOG_DEBUG,
1353                         "split: move %d of %d to new blk %d e %d tot %d insibling %d split %d",
1354                         mv, b->ent, s->num, e, tot, insibling, split );
1355                 s->nxt = b->nxt;
1356                 b->nxt = s->num;
1357                 sbase = LBT_BASE(s);
1358                 if ( split ) {
1359                         /* now total includes full size of entry mv.
1360                                 if we included a new entry, it might even exceed bsz.
1361                                 calculate sizes for both blocks w/o mv's values.
1362                         */
1363                         unsigned esz = sizeof(Dict)+d->kln; /* entry's base cost */
1364                         unsigned vln = LBT_VLN(d);
1365                         unsigned keep, diff, mvval, kpval, mvsiz, pos = LBT_POS(d);
1366                         /*
1367                                 from currently used space, substract tot (including all
1368                                 values and possibly new key "need"), but add entry base cost.
1369                                 that should be the size remaining if we where moving
1370                                 all of the values but keep an entry and key.
1371                                 maybe slightly biased if need is bt->vsz (the hadkey case).
1372                         */
1373                         keep = bt->bsz - b->stk + b->ent*sizeof(Dict) + need - tot + esz;
1374                         tot -= vln * bt->vsz; /* undo entries */
1375                         diff = (keep > tot ? keep - tot : tot - keep) / bt->vsz;
1376                         if ( diff > vln )
1377                                 LOG_OTO( done, (ERR_IDIOT,
1378                                         "OOPS! got diff %d vln %d entry %d tot %d keep %d",
1379                                         diff, vln, mv, tot, keep) );
1380                         mvval = (vln - diff)/2;
1381                         if ( keep > tot )
1382                                 mvval += diff;
1383                         kpval = vln - mvval;
1384                         if ( ! mvval || ! kpval ) {
1385                                 LOG_DBG( LOG_WARN, "bad mv/kpval %d/%d keep %d tot %d vln %d diff %d",
1386                                         mvval, kpval, keep, tot, vln, diff );
1387                                 if ( ! mvval )
1388                                         kpval = vln - (mvval = 1);
1389                                 else
1390                                         mvval = vln - (kpval = 1);
1391                         }
1392                         tot += mvsiz = mvval*bt->vsz;
1393                         keep += kpval*bt->vsz;
1394                         src = base+pos;
1395                         LOG_DBG( LOG_DEBUG,
1396                                 "split: entry %d '%.*s' mv %d of %d values, i %d keep %d tot %d",
1397                                 mv, d->kln, src, mvval, vln, i, keep, tot );
1398                         if ( keep > (bt->bsz-LBT_DOFF) || tot > (bt->bsz-LBT_DOFF) )
1399                                 LOG_OTO( done, (ERR_IDIOT,
1400                                         "OOPS! got diff %d vln %d mvval %d entry %d tot %d keep %d",
1401                                         diff, vln, mvval, mv, tot, keep) );
1402                         /* SIGH -- do it */
1403                         memcpy( sbase + (s->stk -= mvsiz),
1404                                 src+d->kln+kpval*bt->vsz, mvsiz );
1405                         if ( e == mv && hadkey && i > kpval ) {
1406                                 insibling = !0;
1407                                 s->stk -= bt->vsz;
1408                                 i -= kpval;
1409                                 memmove( sbase+s->stk, sbase+s->stk+bt->vsz, i*bt->vsz );
1410                                 memcpy( sbase+s->stk+i*bt->vsz, key->val.byt, bt->vsz );
1411                                 mvval++;
1412                         }
1413                         memcpy( sbase + (s->stk -= d->kln), src, d->kln );
1414                         LBT_PVK( s->dct, s->stk, mvval, d->kln );
1415                         s->ent = 1;
1416                         memmove( src + mvsiz, src, d->kln+kpval*bt->vsz );
1417                         pos += mvsiz;
1418                         LBT_PV( d, pos, kpval );
1419                         d++; mv++;
1420                 }
1421                 /* now entries mv..b->ent are moved, mv > 0 */
1422                 b->stk = LBT_POS(b->dct+mv-1); /* pre-adjust stack */
1423                 if ( mv < e ) { /* move entries mv..e-1 */
1424                         /* used stack space is from pos of entry e-1 to end of entry mv
1425                                 (might differ from pos of mv-1 due to split)
1426                         */
1427                         unsigned from = LBT_POS(b->dct+e-1), nent = e-mv,
1428                                 sz = LBT_POS(b->dct+mv)+b->dct[mv].kln
1429                                         +LBT_VLN(b->dct+mv)*bt->vsz - from;
1430                         int adj = (s->stk -= sz) - from; /* entry position adjustment */
1431                         memcpy( sbase + s->stk, base+from, sz );
1432                         memcpy( s->dct+s->ent, b->dct+mv, nent*sizeof(Dict) );
1433                         d = s->dct + s->ent;
1434                         s->ent += nent;
1435                         for ( ; nent--; d++ )
1436                                 LBT_POSADD( d, adj );
1437                 }
1438                 if ( insibling && mv <= e ) { /* now mv or insert e, unless done on split */
1439                         d = b->dct + e;
1440                         if ( hadkey ) { /* mv existing entry e >= mv, add new val */
1441                                 /* i is valpos */
1442                                 unsigned vln = LBT_VLN(d);
1443                                 src = base+LBT_POS(d)+d->kln;
1444                                 if ( i < vln ) {
1445                                         unsigned sz = (vln - i)*bt->vsz;
1446                                         memcpy( sbase + (s->stk -= sz), src + i*bt->vsz, sz );
1447                                 }
1448                                 memcpy( sbase + (s->stk -= bt->vsz), key->val.byt, bt->vsz );
1449                                 if ( i ) {
1450                                         unsigned sz = i*bt->vsz;
1451                                         memcpy( sbase + (s->stk -= sz), src, sz );
1452                                 }
1453                                 memcpy( sbase + (s->stk -= d->kln), base+LBT_POS(d), d->kln );
1454                                 LBT_PVK( s->dct + s->ent, s->stk, vln+1, d->kln );
1455                                 e++; /* e is moved */
1456                         } else {
1457                                 memcpy( sbase + (s->stk -= bt->vsz), key->val.byt, bt->vsz );
1458                                 memcpy( sbase + (s->stk -= key->len), key->byt, key->len );
1459                                 LBT_PVK( s->dct + s->ent, s->stk, 1, key->len );
1460                         }
1461                         s->ent++;
1462                 } /* mv or insert e */
1463                 if ( e < b->ent && mv < b->ent ) { /* move entries max(e,mv) to end */
1464                         unsigned fst = mv < e ? e : mv;
1465                         /* used stack space is from pos of entry b->ent-1 (the original b->stk)
1466                                 to end of entry fst (might differ from pos of fst-1 due to split)
1467                         */
1468                         unsigned from = LBT_POS(b->dct+b->ent-1), nent = b->ent-fst,
1469                                 sz = LBT_POS(b->dct+fst)+b->dct[fst].kln
1470                                         +LBT_VLN(b->dct+fst)*bt->vsz - from;
1471                         int adj = (s->stk -= sz) - from; /* entry position adjustment */
1472                         LOG_DBG( LOG_DEBUG, "split: entries %d..%d sz %d from %d to %d",
1473                                 fst, b->ent-1, sz, from, s->stk );
1474                         memcpy( sbase + s->stk, base+from, sz );
1475                         memcpy( s->dct+s->ent, b->dct+fst, nent*sizeof(Dict) );
1476                         d = s->dct + s->ent;
1477                         s->ent += nent;
1478                         for ( ; nent--; d++ ) {
1479 #if !defined(NDEBUG)
1480                                 unsigned p = LBT_POS(d);
1481 #endif
1482                                 LBT_POSADD( d, adj );
1483 #if !defined(NDEBUG)
1484                                 LOG_DBG( LOG_DEBUG, "new ent %d p/v/k %d/%d/%d o %d",
1485                                         d - s->dct, LBT_POS(d), LBT_VLN(d), d->kln, p );
1486 #endif
1487                         }
1488                 }
1489                 b->ent = mv; /* kill entries */
1490                 LOG_DBG( LOG_DEBUG,
1491                         "split: blks %d/%d with %d/%d entries, s starts '%.*s'",
1492                         b->num, s->num, b->ent, s->ent, s->dct[0].kln, sbase+LBT_POS(s->dct) );
1493         }       /* new sibling */
1494
1495         if ( ! insibling ) {
1496                 /* now we do have enough space */
1497                 d = b->dct + e;
1498                 stk = base + b->stk; /* bottom of data stack */
1499                 if ( hadkey ) { /* insert value at i */
1500                         unsigned char *val = base + LBT_POS(d)+d->kln+i*bt->vsz;
1501                         need = bt->vsz;
1502 #if 0
1503                 LOG_DBG( LOG_DEBUG,
1504                         "in entry %d '%.*s' val %d: memmove( %p %p %d) val %p need %d",
1505                         e, d->kln, base+LBT_POS(d), i,
1506                         stk-need, stk, (val-stk)+need, val, need );
1507 #endif
1508                         memmove( stk - need, stk, (val - stk)+need );
1509                         memcpy( val - need, key->val.byt, bt->vsz );
1510                         LBT_VLNADD( d, 1 );
1511                 } else { /* insert key, value and dict entry */
1512                         /* take dict and data space from entry e */
1513                         unsigned char *nu;
1514                         need = key->len + bt->vsz;
1515                         if ( e == b->ent )
1516                                 nu = base + b->stk - need;
1517                         else {
1518                                 /* at end of next entry, i.e. end of block or beg of previous */
1519                                 nu = base + LBT_POS(d) + d->kln + LBT_VLN(d)*bt->vsz;
1520                                 memmove( stk - need, stk, nu - stk );
1521                                 nu -= need;
1522                                 memmove( d+1, d, (b->ent - e)*sizeof(Dict) );
1523                         }
1524                         memcpy( nu, key->byt, key->len );
1525                         memcpy( nu + key->len, key->val.byt, bt->vsz );
1526                         LBT_PV( d, nu-base, 1 );
1527                         d->kln = key->len;
1528                         d++;
1529                         e++;
1530                         b->ent++;
1531                 }
1532                 /*
1533                         need is set to the needed (i.e. additionally used) data space,
1534                         e is the first dict entry to redirect need bytes lower
1535                 */
1536                 b->stk -= need;
1537                 for ( ; e < b->ent; e++, d++ )
1538                         LBT_POSADD( d, -need );
1539         }       /* ! insibling */
1540         while ( s ) { /* created sibling */
1541                 unsigned num = s->num;
1542                 unsigned char *e0 = LBT_BASE(s) + LBT_POS(s->dct);
1543                 unsigned l = s->lev + 1;
1544                 Block *ins;
1545                 Key k;
1546
1547                 memcpy( k.byt, e0, k.len = s->dct[0].kln );
1548                 k.val.len = 0;
1549                 if ( s->lev ? s->dct[0].vln : split )
1550                         memcpy( k.val.byt, e0+k.len, k.val.len = bt->vsz );
1551                 LOG_DBG( LOG_DEBUG, "split: had %d/%d on lev %d key '%.*s' vln %d %s",
1552                         b->num, num, s->lev, k.len, k.byt, k.val.len, PRVAL( pv, k.val ) );
1553
1554                 /* get parent */
1555                 if ( f.b[l]->num == f.n[l] && ! LBT_BUSY(f.b[l]) )
1556                         f.b[l]->lck = LBT_WR;
1557                 else
1558                         f.b[l] = getblk( bt, f.n[l], l, LBT_WR );
1559                 LOG_DBG( LOG_DEBUG,
1560                         "split: got parent %d lev %d", f.b[l]->num, f.b[l]->lev );
1561                 /* MT TODO: again L/Y link hunt */
1562                 /* ok, locked the parent -- release s and b */
1563                 if ( (ret = WRULK( s, bt )) || (ret = WRULK( b, bt )) )
1564                         break;
1565                 s = 0;
1566                 ins = b = f.b[l];
1567                 base = LBT_BASE(b);
1568                 e = KEYPOS( bt, b, &k );
1569                 LOG_DBG( LOG_DEBUG, "split: got parent pos %d", e );
1570                 if ( LBT_MATCH & e ) /* !??? */
1571                         LOG_OTO( done, (ERR_TRASH, "OOPS! found key in parent" ) );
1572                 if ( ! e ) {
1573                         TRACEKEY( ERR_TRASH, &f );
1574                         LOG_OTO( done, (ERR_TRASH, "OOPS! parent insert at 0 lev %d", l ) );
1575                 }
1576                 need = sizeof(Dict) + k.len + 4 + k.val.len;
1577                 if ( need > b->stk - LBT_DOFF - b->ent*sizeof(Dict) ) {
1578                         unsigned half = ((bt->bsz - b->stk)+b->ent*sizeof(Dict)+need) / 2;
1579                         unsigned mv = b->ent, tot = 0, nent, sz;
1580                         int adj;
1581                         if ( e == b->ent ) {
1582                                 tot = need;
1583                                 ins = 0;
1584                         }
1585                         for ( d = b->dct+mv; d--, mv--; ) {
1586                                 sz = sizeof(Dict) + d->kln + 4 + d->vln;
1587                                 if ( half > (tot += sz) ) {
1588                                         if ( e == mv ) {
1589                                                 if ( half > tot+need || tot+need-half < half-tot ) {
1590                                                         tot += need;
1591                                                         ins = 0; /* new link goes to sibling */
1592                                                 } else
1593                                                         break;
1594                                                 if ( half <= tot )
1595                                                         break;
1596                                         }
1597                                         if ( mv )
1598                                                 continue;
1599                                 }
1600                                 if ( tot - half > sz/2 ) { /* don't move this */
1601                                         tot -= sz;
1602                                         d++; mv++;
1603                                 } else if ( ! mv ) /* !??? */
1604                                         mv = 1;
1605                                 break;
1606                         }
1607                         /* split here a little bit simpler */
1608                         s = NEWBLK( bt, l, LBT_WR );
1609                         s->nxt = b->nxt;
1610                         b->nxt = s->num;
1611                         /* straight move entries mv..b->ent-1 to sibling */
1612                         sbase = LBT_BASE(s);
1613                         d = b->dct + mv;
1614                         sz = b->dct[mv-1].pos - b->stk;
1615                         nent = b->ent - mv;
1616                         LOG_DBG( LOG_DEBUG,
1617                                 "split: parent lev %d split %d to new %d mv %d"
1618                                 " '%.*s'/%s nent %d sz %d",
1619                                 l, b->num, s->num, mv, d->kln, base+d->pos,
1620                                 prval( pv, base+d->pos+d->kln, d->vln ), nent, sz );
1621                         memcpy( sbase + (s->stk -= sz), base + b->stk, sz );
1622                         adj = s->stk - b->stk;
1623                         b->stk += sz;
1624                         memcpy( s->dct, d, nent*sizeof(Dict) );
1625                         b->ent = mv;
1626                         s->ent = nent;
1627                         for ( d = s->dct; nent--; d++ )
1628                                 d->pos += adj;
1629                         if ( ! ins ) {
1630                                 ins = s;
1631                                 e -= mv;
1632                         }
1633                         if ( (ret = chkblk( s, bt )) )
1634                                 break;
1635                 }
1636                 /* now insert link in block ins at pos e */ {
1637                         unsigned isz = k.len + 4 + k.val.len, nent = ins->ent - e;
1638                         unsigned char *ibase = LBT_BASE(ins);
1639                         unsigned end = e ? ins->dct[e-1].pos : bt->bsz;
1640                         LOG_DBG( LOG_DEBUG,
1641                                 "split: ins parent %d lev %d pos %d key '%.*s' -> %d vln %d %s",
1642                                 ins->num, ins->lev, e, k.len, k.byt, num, k.val.len,
1643                                 PRVAL( pv, k.val ) );
1644                         memmove( ibase + ins->stk - isz, ibase + ins->stk, end - ins->stk );
1645                         ins->stk -= isz;
1646                         memcpy( ibase + (end -= 4), &num, 4 );
1647                         if ( k.val.len )
1648                                 memcpy( ibase + (end -= k.val.len), k.val.byt, k.val.len );
1649                         memcpy( ibase + (end -= k.len), k.byt, k.len );
1650                         for ( d = ins->dct+ins->ent; nent--; d-- ) {
1651                                 *d = d[-1];
1652                                 d->pos -= isz;
1653                         }
1654                         d->pos = end;
1655                         d->vln = k.val.len;
1656                         d->kln = k.len;
1657                         ins->ent++;
1658                         if ( (ret = chkblk( ins, bt )) )
1659                                 break;
1660                 }
1661                 if ( ! s ) /* write b below or in next turn */
1662                         break;
1663                 if ( ! b->num ) { /* root split */
1664                         Block *bb = NEWBLK( bt, l, LBT_WR );
1665                         LOG_DBG( LOG_DEBUG, "root split!" );
1666                         /* cp all but the num */
1667                         memcpy( LBT_BASE(bb)+4, LBT_BASE(b)+4, bt->bsz-4 );
1668                         b->nxt = s->nxt; /* restore root's special nxt */
1669                         s->nxt = 0;
1670                         b->stk = bt->bsz; /* empty the root */
1671                         /* add two childs bb and s */
1672                         SETINT( base + (b->stk -= 4), bb->num );
1673                         LBT_PVK( b->dct, b->stk, 0, 0 );
1674                         SETINT( base + (b->stk -= 4), s->num );
1675                         e0 = LBT_BASE(s) + LBT_POS(s->dct);
1676                         if ( s->dct[0].vln )
1677                                 memcpy( base + (b->stk -= bt->vsz), e0+s->dct[0].kln, bt->vsz );
1678                         memcpy( base + (b->stk -= s->dct[0].kln), e0, s->dct[0].kln );
1679                         LBT_PVK( b->dct+1, b->stk, s->dct[0].vln, s->dct[0].kln );
1680                         b->ent = 2;
1681                         b->lev++; /* increase tree depth */
1682                         if ( (ret = WRULK( s, bt )) || (ret = WRULK( bb, bt )) )
1683                                 break;
1684                         s = 0;
1685                         break;
1686                 }
1687         }
1688         if ( b && !ret )
1689                 ret = WRULK( b, bt );
1690 done:
1691         if ( s )
1692                 ULK( s );
1693         if ( b )
1694                 ULK( b );
1695         if ( !ret && b->lev && findkey( &f, LBT_RD ) )
1696                 ret = log_msg( ERR_TRASH, "could not find key after add" );
1697         return ret;
1698 }       /* lbt_add */
1699
1700
1701 int lbt_del ( Idx *bt, Key *key )
1702 {
1703         int ret = 0;
1704         unsigned short e, n, i, mv;
1705         Block *b;
1706         Dict *d;
1707         unsigned char *base, *v;
1708         FindKey f;
1709         PrVal pv;
1710
1711         f.bt  = bt;
1712         f.key = key;
1713         LOG_DBG( LOG_DEBUG, "deleting key '%.*s'/%s",
1714                 key->len, key->byt, PRVAL( pv, key->val ) );
1715         if ( findkey( &f, LBT_WR ) )
1716                 LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1717         if ( !(LBT_MATCH & f.e[0]) ) /* key not found - ok */
1718                 goto done;
1719         LOG_DBG( LOG_DEBUG, "key found" );
1720         e = (LBT_ENTMASK & f.e[0]);
1721         b = f.b[0];
1722         d = b->dct + e;
1723         base = LBT_BASE(b);
1724         v = base+LBT_POS(d)+d->kln; /* start of values */
1725         n = LBT_VLN(d);
1726         if ( ! n )
1727                 goto done;
1728         i = valpos( v, n, bt->vsz, key->val.byt );
1729         LOG_DBG( LOG_DEBUG, "val %sfound pos %u of %u",
1730                 (LBT_MATCH & i) ? "" : "not ", i & ~LBT_MATCH, n );
1731         if ( !(LBT_MATCH & i) ) /* val not found - ok */
1732                 goto done;
1733         i &= ~LBT_MATCH;
1734         LOG_DBG( LOG_DEBUG,
1735                 "del blk %d e %d i/n %d/%d", b->num, e, i, n );
1736         if ( 1 < n ) {
1737                 v += i*bt->vsz; /* end of stack area to move: start of this value */
1738                 mv = bt->vsz; /* amount to move: one value size */
1739                 LBT_VLNADD( d, -1 );
1740         } else { /* sole value -- rm whole entry */
1741                 v = base+LBT_POS(d); /* start of key */
1742                 mv = d->kln+bt->vsz;
1743                 for ( i=b->ent - e - 1; i--; d++ ) /* move the i entries after d down */
1744                         *d = d[1];
1745                 d = b->dct + e; /* reset */
1746                 b->ent--;
1747         }
1748         memmove( base+b->stk+mv, base+b->stk, v-(base+b->stk) );
1749         b->stk += mv;
1750         for ( ; e++ < b->ent; d++ )
1751                 LBT_POSADD( d, mv );
1752         WRULK( b, bt );
1753 done:
1754         if ( f.b[0] )
1755                 ULK( f.b[0] );
1756         return ret;
1757 }       /* lbt_del */
1758
1759
1760
1761 int lbt_loop ( Idx *bt, DXLoop *l )
1762 {
1763         int ret = 0;
1764         unsigned i = 0;
1765         int mode = IDXMODE & l->flg;
1766         Block *b = 0;
1767         Key *cmp = &l->key;
1768         if ( ! cmp->len ) /* start at 1st leaf */
1769                 b = getblk( bt, 1, 0, LBT_RD );
1770         else { /* locate key */
1771                 FindKey f;
1772                 f.bt  = bt;
1773                 f.key = cmp;
1774                 if ( findkey( &f, LBT_RD ) )
1775                         LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1776                 b = f.b[0];
1777                 i = (unsigned)(LBT_ENTMASK & f.e[0]);
1778                 if ( !(LBT_MATCH & f.e[0]) && IDXEQ == mode )
1779                         goto done;
1780         }
1781         if ( IDXUPTO <= mode )
1782                 cmp = &l->to;
1783         for ( ;; i=0 ) { /* blocks */
1784                 unsigned char *base = LBT_BASE( b );
1785                 Dict *d;
1786                 if ( ! b || b->lev )
1787                         return -ERR_TRASH;
1788                 b->lck = LBT_EX; /* lock */
1789                 d = b->dct + i;
1790                 for ( ; i<b->ent; i++, d++ ) {
1791                         unsigned short pos = LBT_POS( d );
1792                         unsigned short vln = LBT_VLN( d );
1793                         unsigned short kln = d->kln;
1794                         unsigned short j;
1795                         unsigned char *v = base+pos+kln;
1796                         Key key;
1797                         Hit hit;
1798                         int diff = memcmp( cmp->byt, base+pos,
1799                                 kln <= cmp->len ? kln : cmp->len );
1800                         switch ( mode ) {
1801                         case IDXPF:
1802                                 if ( diff || kln < cmp->len )
1803                                         goto moan;
1804                                 break;
1805                         case IDXEQ:
1806                                 if ( diff || kln != cmp->len )
1807                                         goto done;
1808                                 break;
1809                         case IDXUPTO:
1810                                 if ( 0 >= diff )
1811                                         goto done;
1812                                 break;
1813                         case IDXINCL:
1814                                 if ( 0 > diff || (!diff && kln > cmp->len) )
1815                                         goto done;
1816                                 break;
1817                         default: /* unused */
1818                                 moan:
1819                                 log_msg( LOG_INFO,
1820                                         "loop ends on key '%.*s'(%d) %d %d",
1821                                         kln, base+pos, kln, diff, cmp->len );
1822                                 goto done;
1823                         }
1824                         memcpy( key.byt, base+pos, key.len = kln );
1825                         if ( l->cb )
1826                                 for ( j=0; j<vln; j++, v += bt->vsz ) {
1827                                         rdval( &hit, v, bt->vsz );
1828                                         ret = l->cb( l->me, &key, &hit );
1829                                         if ( ret )
1830                                                 goto done;
1831                                 }
1832                         else { /* internal check */
1833                                 FindKey f;
1834                                 f.bt  = bt;
1835                                 f.key = &key;
1836                                 memcpy( key.val.byt, v, key.val.len = bt->vsz ); 
1837                                 if ( findkey( &f, LBT_RD ) )
1838                                         LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1839                                 if ( !(LBT_MATCH & f.e[0])
1840                                         || b != f.b[0]
1841                                         || i != (unsigned)(LBT_ENTMASK & f.e[0])
1842                                 ) {
1843                                         log_msg( ERR_TRASH,
1844                                                 "OOPS! want %u(%p).%u got %u(%p).%u (match %x)",
1845                                                 b->num, b, i, f.b[0]->num, f.b[0], LBT_ENTMASK & f.e[0], f.e[0] );
1846                                         rdval( &hit, v, bt->vsz );
1847                                         log_msg( ERR_TRASH,
1848                                                 "\twhen looking for '%.*s'/%u.%u.%u.%u",
1849                                                 key.len, key.byt, hit.mfn, hit.tag, hit.occ, hit.pos );
1850                                         if ( f.b[0]->ent <= (LBT_ENTMASK & f.e[0]) )
1851                                                 log_msg( ERR_TRASH, "\tI found nothing" );
1852                                         else {
1853                                                 Dict *dd = f.b[0]->dct + (LBT_ENTMASK & f.e[0]);
1854                                                 unsigned char *bb = LBT_BASE(f.b[0])+LBT_POS(dd);
1855                                                 rdval( &hit, bb+dd->kln, bt->vsz );
1856                                                 log_msg( ERR_TRASH,
1857                                                         "\tI found '%.*s'/%u.%u.%u.%u",
1858                                                         dd->kln, bb, hit.mfn, hit.tag, hit.occ, hit.pos );
1859                                         }
1860                                 } else
1861                                         log_msg( LOG_TRACE, "key ok" );
1862                         }
1863                 }
1864                 ULK( b );
1865                 if ( ! b->nxt || !(b = getblk(bt, b->nxt, 0, LBT_RD)) )
1866                         break;
1867         }
1868 done:
1869         if ( b )
1870                 ULK( b );
1871         if ( l->cb )
1872                 l->cb( l->me, 0, 0 );
1873         else
1874                 log_msg( LOG_INFO,
1875                         "checked %i keys in %u blks of %dK depth %u cmp/get/miss %u/%u/%u",
1876                         stat.key, bt->len, bt->bsz>>10, bt->root->lev, stat.cmp, stat.get, stat.miss );
1877         return ret;
1878 }       /* lbt_loop */
1879
1880
1881 /* mimik the plain old ldb_search */
1882 int lbt_search ( Idx *bt, Key *key, LdbPost *post, Rec *rec )
1883 {
1884         int ret = 0;
1885         unsigned i = 0;
1886         int cont = rec && rec->len; /* continuing previous terms search */
1887         char *stk = rec ? ((char*)rec + rec->bytes) : 0; /* term stack */
1888         int prefix;
1889         Block *b = 0;
1890         Key start;
1891         LdbP * const p = post->p; /* really just an alias */
1892
1893         /* check for prefix match */
1894         if ( post ) {
1895                 if ( 8 != bt->vsz )
1896                         return log_msg( ERR_INVAL, "bad legacy search on vsz %d", bt->vsz );
1897                 prefix = LDB_PFX & post->mode;
1898         } else {
1899                 if ( ! rec )
1900                         return log_msg( ERR_INVAL, "lbt_search needs rec or post" );
1901                 if ( (prefix = key->len && '$' == key->byt[key->len - 1]) )
1902                         key->len--;
1903         }
1904
1905         if ( ! cont )
1906                 start = *key;
1907         else {
1908                 memset( &start, 0, sizeof(start) );
1909                 start.len = 255 < rec->field[rec->len-1].len
1910                         ? 255 : rec->field[rec->len-1].len;
1911                 memcpy( start.byt, rec->field[rec->len-1].val, start.len );
1912                 rec->len = 0;
1913         }
1914                 
1915         if ( ! start.len ) /* start at 1st leaf */
1916                 b = getblk( bt, 1, 0, LBT_RD );
1917         else { /* locate key */
1918                 FindKey f;
1919                 f.bt  = bt;
1920                 f.key = &start;
1921                 if ( findkey( &f, LBT_RD ) )
1922                         LOG_OTO( ouch, ( ERR_TRASH, "could not find key" ) );
1923                 b = f.b[0];
1924                 i = (unsigned)(LBT_ENTMASK & f.e[0]);
1925                 if ( !(LBT_MATCH & f.e[0]) && ! prefix )
1926                         goto done;
1927         }
1928
1929         for ( ;; i=0 ) { /* blocks */
1930                 unsigned char *base = LBT_BASE( b );
1931                 Dict *d;
1932                 if ( ! b || b->lev )
1933                         return -ERR_TRASH;
1934                 b->lck = LBT_EX; /* lock */
1935                 d = b->dct + i;
1936                 for ( ; i<b->ent; i++, d++ ) { /* entries */
1937                         unsigned char *ent = base + LBT_POS( d ); /* start of key */
1938                         unsigned short kln = d->kln;
1939                         if ( memcmp( key->byt, ent, kln <= key->len ? kln : key->len )
1940                                 || (prefix ? kln < key->len : kln != key->len )
1941                         )
1942                                 goto done;
1943                         if ( rec ) { /* record term */
1944                                 Field *f = rec->field + rec->len; /* field to assign */
1945                                 /** append term to record, unless it's the same as previous */
1946                                 if ( rec->len ) {
1947                                         if ( kln == f[-1].len && !memcmp( f[-1].val, ent, kln ) )
1948                                                 continue;
1949                                 } else if ( cont ) {
1950                                         if ( kln == start.len && !memcmp( start.byt, ent, kln ) )
1951                                                 continue;
1952                                 }
1953                                 stk -= kln;
1954                                 if ( stk < (char*)(f+1) ) /* stack reached field dict */
1955                                         goto done;
1956                                 memcpy( stk, ent, kln );
1957                                 f->tag = 0;
1958                                 f->val = stk;
1959                                 f->len = kln;
1960                                 rec->len++;
1961                                 continue;
1962                         }
1963                         { /* more locals */
1964                         LdbP merge[1024];
1965                         unsigned short vln, k, m=0;
1966                         unsigned char *c;
1967                         int f = post->fil-1; /* highest pos to consider in given postings */
1968                         if ( !(vln = LBT_VLN( d )) )
1969                                 continue;
1970                         ent += kln; /* set to start of values */
1971                         c = ent + 8*(vln-1); /* last value */
1972                         /* collect postings */
1973                         for ( k=vln; k--; c-=8 ) {
1974                                 /* loop backwards (for the fun of it) postings in segment */
1975                                 int prow, ptag, ppos;
1976                                 LdbP e; /* the entry */
1977                                 LdbP samerow; /* highest possible entry w/ same row as e */
1978 #if defined( LDB_BIG_ENDIAN )
1979                                 /* the 8 bytes of a posting are BIG ENDIAN ! */
1980                                 memcpy(e.bytes,c,8);
1981 #else
1982                                 e.bytes[0] = c[7];      e.bytes[1] = c[6];
1983                                 e.bytes[2] = c[5];      e.bytes[3] = c[4];
1984                                 e.bytes[4] = c[3];      e.bytes[5] = c[2];
1985                                 e.bytes[6] = c[1];      e.bytes[7] = c[0];
1986 #endif
1987                                 prow = LDBP_ROW( &e );
1988                                 ptag = LDBP_TAG( &e );
1989                                 ppos = LDBP_POS( &e );
1990                                 LOG_DBG( LOG_VERBOSE, "post %d.%hd pos %06x", prow, ptag, ppos );
1991                                 if ( !prow )
1992                                         continue;
1993                                 if ( (post->cut && prow >= post->cut)
1994                                         || (post->tag && post->tag != ptag)
1995                                 )
1996                                         continue;
1997                                 if ( prow < post->skp ) /* quickly bail out on skip condition */
1998                                         break;
1999                                 LDBP_SETROWTOP( &samerow, &e ); /* for mfn comparison */
2000                                 /* sweep down to postings for the same row as e ... */
2001                                 while ( f >= 0 && LDBP_GT( p+f, &samerow ) )
2002                                         f--;
2003                                 if ( LDB_AND & post->mode ) {
2004                                         int l;
2005                                         /* loop postings for same row, mark all (that are near enough) */
2006                                         LDBP_SETROWBOT( &samerow, &e ); /* for mfn comparison */
2007                                         /* NOTE: postings for row are GT than bottom even if marked */
2008                                         for ( l = f; l>=0 && LDBP_GT( p+l, &samerow ); l-- ) {
2009                                                 if ( post->near ) {
2010                                                         int dist;
2011                                                         if ( ptag != LDBP_TAG( p+l ) ) continue;
2012                                                         if ( LDB_NEAR_G != post->near ) {
2013                                                                 dist = LDBP_POS( p+l ) - LDBP_POS( &e );
2014                                                                 if ( dist < 0 ) dist = -dist;
2015                                                                 if ( 0 < post->near
2016                                                                         ? post->near < dist
2017                                                                         : -post->near != dist /* exact $$$$ */
2018                                                                 ) continue;
2019                                                         }
2020                                                 }
2021                                                 LDBP_SETMARK( p+l );
2022                                         }
2023                                 } else {        /* OR mode */
2024                                         int add;
2025                                         if ( ! post->near ) /* add if row not found: ignore details */
2026                                                 add = 0 > f || prow > LDBP_ROW( p+f );
2027                                         else {  /* add if no exact match */
2028                                                 int l;
2029                                                 /* NOTE: we don't use mark bit in OR mode, do we ? */
2030                                                 for ( l = f; l>=0 && LDBP_GT( p+l, &e ); l-- )
2031                                                         ;
2032                                                 add = 0 > l || LDBP_GT( &e, p+l );
2033                                         }
2034                                         if ( add )
2035                                                 merge[ m++ ] = e;
2036                                 }
2037                         }       /* for postings in segment */
2038                         if ( m ) { /* merge in the merge buffer */
2039                                 LdbP *mm = merge;
2040                                 for ( k = post->fil += m; m && k--; ) {
2041                                         LdbP src;
2042                                         if ( k < m || LDBP_GT( mm, &p[k-m] ) ) {
2043                                                 src = *mm++;
2044                                                 m--;
2045                                                 LOG_DBG( LOG_DEBUG, "merging %d at %d", LDBP_ROW(&src), k );
2046                                         } else
2047                                                 src = p[k-m];
2048                                         if ( k < post->len )
2049                                                 p[k] = src;
2050                                         else { /* set cut */
2051                                                 int row = LDBP_ROW( &src );
2052                                                 if ( row < post->cut || !post->cut )
2053                                                         post->cut = row;
2054                                         }
2055                                 }
2056                                 if ( post->fil > post->len )
2057                                         post->fil = post->len;
2058                                 if ( post->cut ) /* postings for cut row are unreliable */
2059                                         while ( post->fil && post->cut <= LDBP_ROW(p+post->fil-1) )
2060                                                 post->fil--;
2061                         } /* if ( m ) */
2062                         } /* more locals */
2063                 }       /* for entries in block */
2064                 ULK( b );
2065                 if ( ! b->nxt || !(b = getblk(bt, b->nxt, 0, LBT_RD)) )
2066                         break;
2067         }
2068 done:
2069         if ( rec )
2070                 ret = rec->len;
2071 ouch:
2072         if ( b )
2073                 ULK( b );
2074         return ret;
2075 }       /* lbt_search */
2076
2077
2078 int lbt_init ( Idx *bt )
2079 {
2080         int ret = 0, blkbits, i;
2081         Block *l;
2082         int size = lio_size( bt->fd );
2083
2084         bt->root = 0;
2085         bt->hash = 0;
2086         bt->mem = 0;
2087         for ( i=LBT_LRULEV; i--; )
2088                 bt->lru[i] = bt->mru[i] = 0;
2089
2090         if ( 0x3ff & size ) /* not on 1K boundary */
2091                 return log_msg( LOG_ERROR, "bt has bad size %dK + %d",
2092                         size >> 10, (int)size & 0x3ff );
2093         if ( size ) { /* read root header */
2094                 Block b;
2095                 unsigned want = sizeof(b) - LBT_BOFF;
2096                 int got = lio_pread( &bt->fd, LBT_BASE(&b), want, 0 );
2097                 if ( want != (unsigned)got )
2098                         return log_msg( LOG_SYSERR, "could not get root head" );
2099                 if ( b.num )
2100                         return log_msg( ERR_TRASH, "root has num %d", b.num );
2101                 /*if ( LBT_ISIS != b.nxt ) log_msg( LOG_WARN, "magic is 0x%08x", b.nxt );*/
2102                 if ( bt->typ && bt->typ != b.typ )
2103                         log_msg( LOG_WARN, "typ want 0x02%x got 0x02%x", bt->typ, b.typ );
2104                 bt->typ = b.typ;
2105                 if ( bt->key && bt->key != b.key )
2106                         log_msg( LOG_WARN, "key want %d got %d", bt->key, b.key );
2107                 bt->key = b.key;
2108                 if ( bt->col && bt->col != b.col )
2109                         log_msg( LOG_WARN, "col want %d got %d", bt->col, b.col );
2110                 bt->col = b.col;
2111                 bt->dpt = b.lev;
2112                 /*
2113                 b.ent
2114                 b.stk
2115                 */
2116         } else { /* new index */
2117                 bt->dpt = 1;
2118                 if ( ! bt->key )
2119                         bt->key = 30;
2120         }
2121
2122         bt->vsz = 8+(0xf & bt->typ);
2123         blkbits = 10 + (3 & (bt->typ >> 4));
2124         bt->bsz = 1 << blkbits;
2125         if ( size % bt->bsz )
2126                 return log_msg( ERR_TRASH, "size 0x%08x % blksize", size, bt->bsz );
2127         bt->len = size >> blkbits;
2128         if ( 2 > bt->len && !(LBT_WRITE & bt->flg) )
2129                 return log_msg( LOG_SYSERR, "attempt to read empty index" );
2130
2131         bt->hlen = 16 + bt->len / 32;   /* initial cache size */
2132         if ( bt->hlen < 1000 ) bt->hlen = 1000;
2133         if ( !(bt->root = addchunk( bt, bt->hlen )) )
2134                 return log_msg( LOG_SYSERR, "\twhen getting bt cache" );
2135         if ( bt->hlen < 256 ) /* the hash */
2136                 bt->hlen = 256;
2137         if ( !(bt->hash = (Block**)mAlloc( bt->hlen * sizeof(Block*) )) )
2138                 return log_msg( LOG_SYSERR, "\twhen getting bt hash" );
2139         memset( bt->hash, 0, bt->hlen * sizeof(Block*) );
2140
2141         if ( size && readblk( bt->root, bt, 0 ) )
2142                 return log_msg( ERR_TRASH, "could not read root" );
2143         if ( ! bt->len ) {
2144                 /* init: link root to leaf */
2145                 SETINT( LBT_BASE(bt->root)+(bt->root->stk -= 4), 1 );
2146                 LBT_PVK( bt->root->dct, bt->root->stk, 0, 0 );
2147                 bt->root->ent = 1;
2148                 bt->len = 1;
2149         }
2150         if ( !(l = getblk( bt, 1, 0, LBT_EX )) )
2151                 return log_msg( ERR_TRASH, "could not read 1st leaf" );
2152         if ( ! size ) {
2153                 log_msg( LOG_INFO, "creating index bsz %u ksz %u vsz %u",
2154                         bt->bsz, bt->key, bt->vsz );
2155                 bt->root->lev = 1;
2156                 if ( WRULK( bt->root, bt ) || WRULK( l, bt ) )
2157                         return log_msg( LOG_SYSERR, "\twhen writing base" );
2158         }
2159
2160         return ret;
2161 }       /* lbt_init */
2162
2163
2164 void lbt_close ( Idx *bt )
2165 {
2166         Chunk *c, *n = bt->mem;
2167         while ( (c = n) ) {
2168                 n = c->nxt;
2169                 mFree( c );
2170         }
2171         mFree( bt->hash );
2172         lio_close( &bt->fd, LIO_INOUT );
2173         memset( bt, 0, sizeof(*bt) );
2174 }       /* lbt_close */
2175
2176
2177 /* ************************************************************
2178         public functions
2179 */
2180
2181
2182 void cXMkVal ( Idx *bt, Val *val, Hit *hit )
2183 {
2184         if ( hit ) {
2185                 val->len = bt->vsz;
2186                 mkval( val, hit );
2187         } else
2188                 val->len = 0;
2189 }
2190
2191 int cXAdd ( Idx *bt, Key *key, Hit *hit )
2192 {
2193         if ( bt->bat ) {
2194                 if ( ! key )
2195                         key = &bt->bat->key;
2196                 cXMkVal( bt, &key->val, hit );
2197                 return lbt_batchval( bt, key );
2198         } else if ( !key )
2199                 return 0;
2200         else if ( hit ) {
2201                 cXMkVal( bt, &key->val, hit );
2202                 if ( hit->dbn == 0xffff )
2203                         return lbt_del( bt, key );
2204         }
2205         return lbt_add( bt, key );
2206 }       /* cXAdd */