including openisis 0.9.0 into webpac tree
[webpac] / openisis / lbt.c
diff --git a/openisis/lbt.c b/openisis/lbt.c
new file mode 100644 (file)
index 0000000..68a2905
--- /dev/null
@@ -0,0 +1,2206 @@
+/*
+       openisis - an open implementation of the CDS/ISIS database
+       Version 0.8.x (patchlevel see file Version)
+       Copyright (C) 2001-2003 by Erik Grziwotz, erik@openisis.org
+
+       This library is free software; you can redistribute it and/or
+       modify it under the terms of the GNU Lesser General Public
+       License as published by the Free Software Foundation; either
+       version 2.1 of the License, or (at your option) any later version.
+
+       This library is distributed in the hope that it will be useful,
+       but WITHOUT ANY WARRANTY; without even the implied warranty of
+       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+       Lesser General Public License for more details.
+
+       You should have received a copy of the GNU Lesser General Public
+       License along with this library; if not, write to the Free Software
+       Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+       see README for more information
+EOH */
+
+/*
+       $Id: lbt.c,v 1.35 2003/05/27 11:03:30 kripke Exp $
+       the btree.
+*/
+
+#include <stdlib.h>
+#include <string.h>
+
+/* special
+#if defined( __GNUC__ ) && defined ( alloca )
+#include <alloca.h>
+#endif
+*/
+#if defined( sparc ) || defined( __ppc__ )
+#      define LDB_BIG_ENDIAN
+#endif
+
+#include "lio.h"
+#include "luti.h"
+#include "lbt.h"
+
+
+/*
+probably all relevant cases can be handled by the more
+efficient approach of storing recoded key a la strxfrm
+*/
+#define LBT_USECMP 0
+
+
+/* ************************************************************
+       private types
+*/
+
+enum {
+       /* little endian magic text */
+       LBT_OPEN = 'O' | 'P'<<8 | 'E'<<16 | 'N'<<24,
+       LBT_ISIS = 'I' | 'S'<<8 | 'I'<<16 | 'S'<<24
+};
+
+
+enum {
+       LBT_LEVELS  = 16, /* max depth */
+       LBT_POSBITS = 13, /* bits for position within block */
+       LBT_POSMASK = 0x1fff,
+       LBT_POSMAX  = LBT_POSMASK /* obviously :) */
+};
+
+typedef struct Dict { /* dictionary of block entries */
+       unsigned short pos; /* 13 bits position of entry, 2 bits val for leaves */
+       unsigned char  vln; /* length of entry's value */
+               /* for inner nodes, this is the length of qualifying
+                       second key segment (either 0 or bt->vsz <= 23).
+                       for leaves, this, together with higher bits of pos,
+                       is the number of values of bt->vsz each.
+               */
+       unsigned char  kln; /* length of entry's key */
+/*
+       the following access macros are needed for leaves only
+*/
+#define LBT_POS( d ) (LBT_POSMASK & (d)->pos)
+#define LBT_VLN( d ) ((d)->vln | (~LBT_POSMASK & (d)->pos)>>(LBT_POSBITS-8))
+#define LBT_POSADD( d, a ) do { \
+               (d)->pos = (~LBT_POSMASK & (d)->pos) | (LBT_POSMASK & ((d)->pos + (a))); \
+       } while (0)
+#define LBT_VLNADD( d, a ) do { \
+               unsigned _vln = LBT_VLN( d ) + (a); \
+               (d)->pos = (LBT_POSMASK & (d)->pos) \
+                       | (~LBT_POSMASK & ((_vln)<<(LBT_POSBITS-8))); \
+               (d)->vln = (unsigned char)(_vln); \
+       } while (0)
+#define LBT_PV( d, p, v ) do { \
+               (d)->pos = (LBT_POSMASK & (p)) | (~LBT_POSMASK & ((v)<<(LBT_POSBITS-8))); \
+               (d)->vln = (unsigned char)(v); \
+       } while (0)
+#define LBT_PVK( d, p, v, k ) do { \
+               LBT_PV( d, p, v ); \
+               (d)->kln = (unsigned char)(k); \
+       } while (0)
+} Dict;
+
+
+/* guard space around block */
+#ifndef NDEBUG
+#      define LBT_GUARD 64
+/* template for guard space -- blank or pattern */
+static unsigned char guard[LBT_GUARD];
+#else
+#      define LBT_GUARD 0
+#endif
+
+
+/*
+       TODO: check the possible performance gain in findkey
+       from separating disk block and administrative data
+       and aligning disk blocks on PAGE boundaries in memory
+*/
+typedef struct Block {
+#if LBT_GUARD
+       unsigned char  grd[LBT_GUARD];
+#endif
+       unsigned char  ird; /* is reading */
+       unsigned char  wrd; /* #threads waiting for read */
+       unsigned char  lck; /* is write locked */
+       unsigned char  wlk; /* #threads waiting for write lock */
+#define LBT_BUSY( b ) (*(int*)&(b)->ird) /* any of 1st 4 bytes != 0 */
+       struct Block  *nhs; /* next in hash chain */
+       struct Block  *nru; /* next (more) recently used */
+       /* TODO: possibly double-link lru list, so we can unlink on every access ? */
+       /* the disk block -- from here on bsz bytes are on disk */
+#define LBT_BOFF ((char *)&((Block*)0)->num - (char*)0)
+#define LBT_BASE( b ) ((unsigned char *)&(b)->num)
+       unsigned       num; /* number of this block, 0 is root */
+       unsigned       nxt; /* right sibling, "OPEN" or "ISIS" for root */
+       /* typ,key,col are redundant in non-root blocks, maybe abused ... */
+       unsigned char  typ; /* type: bsz, vsz, flags */
+       unsigned char  key; /* max key length */
+       unsigned char  col; /* collation */
+       unsigned char  lev; /* level over bottom, 0 for leaves */
+       unsigned short ent; /* 10 bits # entries */
+#define LBT_ENT( b ) (b)->ent /* currently other bits arent abused */
+       unsigned short stk; /* 13 bits offset of keyspace stack */
+/* offset of dict RELATIVE to base */
+#define LBT_DOFF ((char *)&((Block*)0)->dct - (char*)&((Block*)0)->num)
+       Dict           dct[1];
+}      Block;
+
+/*
+       LAYOUT:
+       dct is an array of dictionary entries in increasing key order
+       stk is growing downwards from block end towards dict
+       position of entry i is b->stk <= LBT_POS(b->dct+i) < bt->bsz
+       decreasing on i (i.e. stack space is in reverse ordering)
+
+       at LBT_POS(b->dct+i), there are b->dct[i].kln bytes key
+       for a leaf entry, the key is followed by
+       LBT_VLN(b->dct+i) (between 0..1023 incl.) values
+       of bt->vsz (between 8..23 incl) each
+       for an inner entry, the key is followed by
+       4 bytes block number and optionally bt->vsz qualifying value
+
+       SIZES:
+       block size >= 1024
+       block base overhead LBT_DOFF is 16 bytes
+       avail size >= 1008
+       Dict entry is 4 bytes
+       max. used stack space per entry is:
+       inner node: kln+4+bt->vsz <= 255+4+23 = 282
+       new leaf entry (one value): kln+bt->vsz <= 278
+       max total space for a new entry incl. dict is 286 < avail size/3
+*/
+
+typedef struct Chunk {
+       struct Chunk *nxt; 
+}      Chunk;
+
+
+enum {
+       /* number of levels for lru list */
+       LBT_LRULEV  = sizeof(((Idx*)0)->lru)/sizeof(((Idx*)0)->lru[0]),
+       LBT_ENTBITS = 10, /* bits for # enries */
+       LBT_ENTMASK = 0x03ff,
+       LBT_ENTMAX  = LBT_ENTMASK,
+       LBT_MATCH   = 0x8000, /* entry matched */
+       LBT_ENTSIZE = 8 /* minimum cost of entry: 4 byte dict, 4 byte val */
+};
+
+
+typedef struct Batch {
+       Block   *cur;
+       unsigned got; /* current for this key */
+       unsigned tot; /* total for this key */
+       unsigned let;
+       /* stats */
+       unsigned keys;
+       unsigned keyl;
+       unsigned maxl;
+       unsigned vals;
+       unsigned maxv;
+       unsigned span;
+       unsigned free;
+       /* buffer */
+       Key      key;
+       Val      val[LBT_ENTMAX];
+}      Batch;
+
+
+typedef enum {
+       LBT_RD, /* read -- no lock */
+       LBT_WR, /* write */
+       LBT_EX /* exclusive */
+} lbt_get;
+
+
+typedef struct {       /* structure filled by findkey */
+       Idx           *bt;      /* the tree */
+       Key           *key; /* the key searched */
+       Block         *b[LBT_LEVELS]; /* path to leaf, b[0] is leaf */
+       unsigned       n[LBT_LEVELS]; /* block numbers of path */
+       unsigned short e[LBT_LEVELS]; /* entries in path */
+}      FindKey;
+
+
+typedef struct {
+       unsigned get;
+       unsigned miss;
+       unsigned cmp;
+       unsigned key;
+}      Stat;
+
+/* buffer to print a value to.
+       lowest 11 bytes are printed as 4 shorts + one int < 4*5+10 = 30 digits+5sep
+       higher 12 bytes are printed hex = 24 bytes.
+*/
+typedef char PrVal[64];
+
+
+/* ************************************************************
+       private data
+*/
+
+/*
+       stat access is not guarded by memory barrier.
+       may be wrong when running on multiple processors
+*/
+static Stat stat;
+
+
+/* ************************************************************
+       private functions
+*/
+
+#if defined( sparc )
+static unsigned GETINT ( void *m ) /* bus error safe version of *(int*) */
+{
+       unsigned l;
+       memcpy( &l, m, 4 );
+       return l;
+}
+static void SETINT ( void *m, unsigned i )
+{
+       memcpy( m, &i, 4 );
+}
+#else
+#define GETINT( m ) (*(unsigned*)(m))
+#define SETINT( m, i ) (*(unsigned*)(m) = (i))
+#endif
+
+static void mkval ( Val *v, Hit *ifp )
+{
+       unsigned char *c = v->byt;
+       switch ( v->len ) {
+       default: memset( c, 0, v->len-11 ); c += v->len-11;
+       case 11: *c++ = (unsigned char) (ifp->dbn >> 8);
+       case 10: *c++ = (unsigned char)  ifp->dbn;
+       case 9:  *c++ = (unsigned char) (ifp->mfn >> 24);
+       case 8:  *c++ = (unsigned char) (ifp->mfn >> 16);
+       case 7:  *c++ = (unsigned char) (ifp->mfn >> 8);
+       case 6:  *c++ = (unsigned char)  ifp->mfn;
+       case 5:  *c++ = (unsigned char) (ifp->tag >> 8);
+       case 4:  *c++ = (unsigned char)  ifp->tag;
+       case 3:  *c++ = (unsigned char)  ifp->occ;
+       case 2:  *c++ = (unsigned char) (ifp->pos >> 8);
+       case 1:  *c++ = (unsigned char)  ifp->pos;
+       case 0:  ;
+       }
+}      /* mkval */
+
+
+static Hit *rdval ( Hit *ifp, unsigned char *c, unsigned vsz )
+{
+       ifp->mfn = ifp->tag = ifp->occ = ifp->pos = ifp->dbn = 0;
+       switch ( vsz ) {
+       default: c += vsz-11;
+       case 11: ifp->dbn  = (unsigned short)*c++ << 8;
+       case 10: ifp->dbn |=                 *c++;
+       case 9:  ifp->mfn  = (unsigned  int)*c++ << 24;
+       case 8:  ifp->mfn |= (unsigned  int)*c++ << 16;
+       case 7:  ifp->mfn |= (unsigned  int)*c++ << 8;
+       case 6:  ifp->mfn |=                 *c++;
+       case 5:  ifp->tag  = (unsigned short)*c++ << 8;
+       case 4:  ifp->tag |=                 *c++;
+       case 3:  ifp->occ  =                 *c++;
+       case 2:  ifp->pos  = (unsigned short)*c++ << 8;
+       case 1:  ifp->pos |=                 *c++;
+       case 0:  ;
+       }
+       return ifp;
+}      /* rdval */
+
+
+static char *prval ( PrVal buf, unsigned char *c, unsigned vsz )
+{
+       char *d = buf;
+       Hit h;
+       if ( ! vsz ) {
+               buf[0] = '#';
+               buf[1] = 0;
+               return buf;
+       }
+       if ( 23 < vsz )
+               vsz = 23;
+       if ( 11 < vsz ) {
+               while ( 11 < vsz-- ) {
+                       *d++ = luti_hex[ *c >> 4 ];
+                       *d++ = luti_hex[ *c++ & 0xf ];
+               }
+               *d++ = '.';
+       }
+       rdval( &h, c, vsz );
+       d += u2a( d, h.dbn ); *d++ = '.';
+       d += u2a( d, h.mfn ); *d++ = '.';
+       d += u2a( d, h.tag ); *d++ = '.';
+       d += u2a( d, h.occ ); *d++ = '.';
+       d += u2a( d, h.pos ); *d++ = '.';
+       *d = 0;
+       return buf;
+}      /* prval */
+
+/* for PRVAL f->key->val */
+#define PRVAL( buf, v ) prval( buf, v.byt, v.len )
+
+
+/* ************************************************************
+       block I/O & cache mgmt
+*/
+
+static void iniblk ( Block *b, Idx *bt, unsigned num, unsigned char lev )
+{
+       memset( b, 0, LBT_BOFF + bt->bsz + LBT_GUARD );
+       b->num = num;
+       b->typ = bt->typ;
+       b->key = bt->key;
+       b->col = bt->col;
+       b->lev = lev;
+       b->stk = bt->bsz;
+}      /* iniblk */
+
+
+static int chkblk ( Block *b, Idx *bt )
+{
+#if 0
+       (void)b; (void)bt;
+#else
+       int i;
+       unsigned short eod = LBT_DOFF + b->ent*sizeof(Dict), siz, pos, vln, key;
+       unsigned end = bt->bsz, tot = 0;
+       unsigned char *base = LBT_BASE(b);
+#if LBT_GUARD
+       if ( memcmp( guard, b->grd, LBT_GUARD ) )
+               return log_msg( ERR_TRASH, "OOPS! initial guard frobbed on blk %d", b->num );
+       if ( memcmp( guard, LBT_BASE(b)+bt->bsz, LBT_GUARD ) )
+               return log_msg( ERR_TRASH, "OOPS! initial guard frobbed on blk %d", b->num );
+#endif
+       if ( b->num > bt->len || (b->num && b->nxt > bt->len) )
+               return log_msg( ERR_TRASH,
+                       "bad num %d / nxt %d on lev %d", b->num, b->nxt, b->lev );
+       if ( b->typ != bt->typ || b->key != bt->key || b->col != bt->col )
+               return log_msg( ERR_TRASH,
+                       "bad sig 0x%02x,%d,%d in block %d want 0x%02x,%d,%d",
+                       b->typ, b->key, b->col, b->num, bt->typ, bt->key, bt->col );
+       if ( b->ent > bt->bsz / LBT_ENTSIZE || b->stk > bt->bsz || b->stk < eod )
+               return log_msg( ERR_TRASH,
+                       "bad ent/stk %d,%d in block %d eod %d sz %d",
+                       b->ent, b->stk, b->num, eod, bt->bsz );
+       for ( i=0; i<b->ent; i++ ) {
+               Dict *d = &b->dct[i];
+               pos = LBT_POS( d );
+               vln = LBT_VLN( d );
+               key = d->kln;
+               siz = key + (b->lev ? vln+4u : vln * bt->vsz);
+               if ( pos < b->stk || end != (unsigned)pos + siz )
+                       return log_msg( ERR_TRASH,
+                               "bad pos/vln/key %d/%d/%d in entry %d of block %d"
+                               " stack %d sz %d end %d expected %d",
+                               pos, vln, key, i, b->num, b->stk, siz, pos+siz, end );
+               if ( b->lev ) {
+                       unsigned lnk;
+                       if ( vln && vln != +bt->vsz )
+                               return log_msg( ERR_TRASH,
+                                       "bad vln %d in entry %d of block %d vsz %d",
+                                       vln, i, b->num, bt->vsz );
+                       lnk = GETINT( base + pos + key + vln );
+                       if ( lnk >= bt->len )
+                               return log_msg( ERR_TRASH,
+                                       "bad link %u in entry %d of block %d",
+                                       lnk, i, b->num );
+               }
+#if 1 && ! defined( NDEBUG )
+               if ( i ) {
+                       unsigned short op = LBT_POS( d-1 ), okl = d[-1].kln;
+                       int mc = memcmp( base+pos, base+op, key<okl ? key : okl ), j;
+                       if ( 0 < mc || (!mc && 0 < (mc = (int)key - (int)okl)) )
+                               goto ok;
+                       j = 0;
+                       if ( !mc && b->lev && vln ) { /* compare values */
+                               unsigned ovln = d[-1].vln;
+                               if ( ! ovln || 0 < (j = memcmp( base+pos+key, base+op+okl, vln )) )
+                                       goto ok;
+                       }
+                       return log_msg( ERR_TRASH,
+                               "bad key order entry %d of block %d lev %d"
+                               " mc %d j %d key %d okl %d vln %d",
+                               i, b->num, b->lev, mc, j, key, okl, vln
+                       );
+               }
+       ok:
+#endif
+               tot += siz;
+               end = pos;
+       }
+       if ( tot > bt->bsz - b->stk )
+               return log_msg( ERR_TRASH,
+                       "bad total entry size %d in block %d stack %d",
+                       tot, b->num, b->stk );
+#ifndef NDEBUG
+       if ( LOG_DO( LOG_TRACE ) ) {
+               log_msg( LOG_TRACE, 
+                       "block %d lev %d sig 0x%02x,%d,%d ent %d eod %d stk %d",
+                       b->num, b->lev, b->typ, b->key, b->col, b->ent, eod, b->stk
+               );
+               for ( i=0; i<2; i++ ) {
+                       int j = i ? b->ent-1 : 0;
+                       Dict *d = &b->dct[j];
+                       if ( i >= b->ent ) /* if b->ent is 0 or 1, log no or one key */
+                               break;
+                       pos = LBT_POS( d );
+                       vln = LBT_VLN( d );
+                       key = d->kln;
+                       end = pos + b->lev ? vln+4u : vln * bt->vsz;
+                       /* start of value */
+                       if ( b->lev )
+                               log_msg( LOG_TRACE, "key %d '%.*s'/%d -> %d",
+                                       j, key, base+pos, vln, GETINT(base+pos+key+vln) );
+                       else {
+                               Hit h;
+                               rdval( &h, base+pos+key, bt->vsz );
+                               log_msg( LOG_TRACE, "key %d '%.*s'[%d] = %d[%d,%d,%d]",
+                                       j, key, base+pos, vln, h.mfn, h.tag, h.occ, h.pos );
+                       }
+               }
+       }
+#endif
+#endif
+       return 0;
+}      /* chkblk */
+
+
+static Block *addchunk ( Idx *bt, unsigned cnt )
+{
+       unsigned ibsz = LBT_BOFF + bt->bsz + LBT_GUARD; /* internal block size */
+       /* increase cnt so it's > 0 and add one extra to return */
+       char *mem = (char *)mAlloc( sizeof(Chunk) + (1 + ++cnt)*ibsz );
+       Chunk *chk = (Chunk *)mem;
+       if ( ! mem )
+               return 0;
+       /* chain the chunk */
+       chk->nxt = bt->mem;
+       bt->mem = chk;
+       mem += sizeof(Chunk);
+       bt->clen += cnt;
+       /* add cnt blocks to level 0 lru list */
+       if ( ! bt->mru[0] ) /* level 0 cache is empty */
+               bt->mru[0] = (Block *)mem;
+       for ( ; cnt--; mem += ibsz ) {
+               Block *b = (Block *)mem;
+               iniblk( b, bt, 0 , 0 ); /* initialise empty block */
+               b->nru = bt->lru[0];
+               bt->lru[0] = b;
+       }
+       iniblk( (Block *)mem, bt, 0 , 0 );
+       return (Block *)mem;
+}      /* addchunk */
+
+
+static int readblk ( Block *b, Idx *bt, unsigned num )
+{
+       int got = lio_pread( &bt->fd, LBT_BASE(b), bt->bsz, num*bt->bsz );
+       log_msg( LOG_VERBOSE, "reading block %u got %u", num, got );
+       if ( bt->bsz != (unsigned)got )
+               return log_msg( LOG_SYSERR, "\twhen reading bt block %d", num );
+       if ( b->num != num )
+               return log_msg( ERR_TRASH, "bad num %d in block %d", b->num, num );
+       return chkblk( b, bt );
+} /* readblk */
+
+
+static void unlock ( Block *b )
+{
+       b->lck = 0;
+       if ( b->wlk )
+               (void)LIO_WAKE( b->num );
+}
+
+#define ULK( b ) do { \
+       if ( (b)->lck ) { (b)->lck = 0; if ( b->wlk ) unlock( b ); } \
+       } while (0)
+
+
+static int writeblk ( Block *b, Idx *bt, int ulk )
+{
+       int got;
+       if ( chkblk( b, bt ) ) /* don't write no shize */
+               return log_msg( ERR_TRASH, "\tbefore writing bt block %d", b->num );
+       got = lio_pwrite( &bt->fd, LBT_BASE(b), bt->bsz, b->num*bt->bsz );
+       log_msg( LOG_VERBOSE, "writing block %u got %u", b->num, got );
+       if ( ulk )
+               ULK( b );
+       if ( bt->bsz != (unsigned)got )
+               return log_msg( LOG_SYSERR, "\twhen writing bt block %d", b->num );
+       return 0;
+} /* writeblk */
+
+#define WRULK( b, bt ) writeblk( b, bt, 1 )
+
+
+#define NEWBLK( bt, lev, lock ) getblk( bt, bt->len, lev, lock )
+/**
+       get block #num.
+       1) locate block #num in cache.
+       2) if it is found:
+               if it is READING, or we want to lock and it is locked,
+               incr # waiters, wait for it.
+       3) if it is not found:
+               locate a free cache entry, extending the cache if necessary,
+               set the num, set the READING flag, go read it.
+               after reading, if there are waiters, wake them up.
+               since we are the first to get a hold on the block,
+               we may write lock it, if we want.
+       4) return the block.
+*/
+static Block *getblk ( Idx *bt, unsigned num,
+       unsigned char lev, lbt_get lock )
+{
+       Block *b;
+       stat.get++; /* dirty */
+       if ( !(stat.get & 0x3fff) ) { /* every 16K */
+               static unsigned lmiss;
+               log_msg( LOG_INFO,
+                       "searched %i keys in %u blks cache %d %d%%"
+                       " depth %u cmp/get/miss %u/%u/%u %d%% (%d %d%%)",
+                       stat.key, bt->len, bt->clen, bt->clen*100 / bt->len,
+                       bt->root->lev, stat.cmp, stat.get,
+                       stat.miss, stat.miss*100 / stat.get,
+                       (stat.miss - lmiss), (stat.miss - lmiss)*100 / 0x4000
+               );
+               lmiss = stat.miss;
+       }
+startover: /* only used on absurd congestion */
+       b = 0;
+       LOG_DBG( LOG_DEBUG, "get bt block %u %u %d", num, lev, lock );
+       if ( ! num )
+               b = bt->root;
+       else if ( num < bt->len ) {
+               b = bt->hash[ num % bt->hlen ];
+               while ( b && num != b->num )
+                       b = b->nhs;
+       }
+       if ( b )
+               while ( b->ird || (lock && b->lck) ) {
+                       int wrd = b->ird;
+                       int wlk = lock && b->lck;
+                       LOG_DBG( LOG_VERBOSE, "lck on block %u %u r%d l%d",
+                               num, lev, b->ird, b->lck );
+                       if ( LBT_EX <= lock ) {
+                               log_msg( LOG_ERROR, "concurrent mod on EX lock" );
+                               return 0;
+                       }
+                       if ( ! lio_lock ) {
+                               log_msg( LOG_FATAL, "bad MT environment: concurrent mod, no lock" );
+                               return 0;
+                       }
+                       if ( wrd && 0xff != b->wrd )
+                               b->wrd++;
+                       if ( wlk && 0xff != b->wlk )
+                               b->wlk++;
+                       (void)LIO_WAIT( num ); /* zzzzz .... sleep tight */
+                       /* some hours later */
+                       if ( num != b->num ) {
+                               log_msg( LOG_WARN, "heavy congestion: block %d has gone", num );
+                               goto startover;
+                       }
+                       if ( wrd && b->wrd )
+                               b->wrd--;
+                       if ( wlk && b->wlk )
+                               b->wlk--;
+               }
+       else { /* not found or new block */
+               /* find free block */
+               int i = 0, llev, err = 0;
+               stat.miss++; /* dirty */
+               LOG_DBG( LOG_DEBUG, "bt block %u %u not found", num, lev );
+               for ( ; i < (int)LBT_LRULEV; i++ ) {
+                       /* check lru list for level i */
+                       Block **bp = &bt->lru[i];
+                       while ( (b = *bp) && LBT_BUSY(b) )
+                               bp = &b->nru;
+                       if ( ! b ) /* try next level */
+                               continue;
+                       *bp = b->nru; /* dequeue */
+                       if ( bt->mru[i] == b ) /* was the tail */
+                               bt->mru[i] = 0;
+                       break;
+               }
+               if ( ! b ) {
+                       log_msg( LOG_WARN, "no free blocks in cache" );
+                       if ( !(b = addchunk( bt, 64 )) )
+                               return 0;
+                       /* TODO: should we also extend the hash ? */
+               } else if ( b->num ) { /* got block -- should be hashed somewhere */
+                       Block **bp = &bt->hash[ b->num % bt->hlen ];
+                       while ( *bp && *bp != b )
+                               bp = &(*bp)->nhs;
+                       if ( *bp == b )
+                               *bp = b->nhs; /* dehash */
+                       /* else ... hmmm should have found it */
+               }
+               assert( ! LBT_BUSY( b ) );
+               iniblk( b, bt, num, lev );
+               /* hash it, so waiters can find it */
+               b->nhs = bt->hash[ b->num % bt->hlen ];
+               bt->hash[ b->num % bt->hlen ] = b;
+               if ( num == bt->len ) { /* extend */
+                       b->lev = lev;
+                       bt->len++;
+                       /* TODO: should be configurable */
+                       if ( bt->len > bt->clen << 5 && bt->clen < 0x4000 )
+                               addchunk( bt, bt->clen / 4 );
+               } else {
+                       b->ird = 1;
+                       (void)LIO_RELE();
+                       err = readblk( b, bt, num );
+                       (void)LIO_LOCK();
+                       b->ird = 0;
+                       if ( ! err && lev != b->lev )
+                               err = log_msg( ERR_TRASH, "bad lev %d in block %d want %d",
+                                       b->lev, b->num, lev );
+                       if ( err ) {
+                               Block **bp = &bt->hash[ num % bt->hlen ];
+                               while ( *bp && *bp != b )
+                                       bp = &(*bp)->nhs;
+                               if ( *bp == b )
+                                       *bp = b->nhs; /* dehash */
+                               b->lev = 0; /* add to level 0 lru list */
+                               b->num = 0; /* tell others that it isn't the expected block */
+                       }
+               }
+               /* add to it's levels lru list */
+               llev = LBT_LRULEV > b->lev ? b->lev : LBT_LRULEV-1;
+               if ( bt->mru[ llev ] )
+                       bt->mru[ llev ]->nru = b;
+               else 
+                       bt->lru[ llev ] = b;
+               (bt->mru[ llev ] = b)->nru = 0;
+               if ( b->wrd )
+                       (void)LIO_WAKE( num );
+               if ( err ) {
+                       b->wrd = b->wlk = 0; /* mark block unused */
+                       return 0;
+               }
+       }       /* got free block */
+       if ( LBT_WR == lock ) {
+               if ( b->lck ) {
+                       log_msg( LOG_FATAL, "bt consi: unexpected lock on %u", num );
+                       return 0;
+               }
+               b->lck = 1;
+       }
+       return b;
+}      /* getblk */
+
+
+/** binsearch position for key.
+       position is either the index of the matching entry (0..b->ent-1)
+       or the index (0..b->ent) where key should be inserted.
+       @return position for key (0..b->ent), |ifyed with LBT_MATCH, if found
+*/
+static unsigned short keypos (
+#if LBT_USECMP
+       Idx *bt,
+#define KEYPOS( bt, b, key ) keypos( bt, b, key )
+#else
+#define KEYPOS( bt, b, key ) keypos( b, key )
+#endif
+       Block *b, Key *key )
+{
+       unsigned char *base = LBT_BASE( b );
+       unsigned char *kbt = key->byt;
+       unsigned char  kln = key->len;
+       unsigned short l = 0, r = b->ent;
+       Dict          *d = b->dct;
+       /* binsearch invariant:
+               key is < all entries i >= r (we do know relation at r)
+               key is > all entries i < l (don't know relation at l)
+       */
+       while ( l < r ) { /* invariant: l <= keypos < r */
+               unsigned short i = (l + r) >> 1; /* l <= i < r */
+               unsigned short pos = LBT_POS(&d[i]);
+               unsigned char cln = d[i].kln;
+               int cmp =
+#if LBT_USECMP
+                       bt->cmp ? bt->cmp( kbt, base+pos, kln<cln ? kln : cln ) :
+#endif
+                       memcmp( kbt, base+pos, kln<cln ? kln : cln );
+               stat.cmp++; /* dirty */
+               if ( ! cmp && !(cmp = (int)kln - cln) ) {
+                       /* match on main key -- what about the value ? */
+                       if ( ! b->lev /* ignore on leaves -- no duplicate keys here */
+                               || (!d[i].vln && !key->val.len) /* no value */
+                       )
+                               return LBT_MATCH | i;
+                       /* so we're on an inner node */
+                       if ( ! d[i].vln ) /* common case */
+                               cmp = 1;
+                       else if ( ! key->val.len )
+                               cmp = -1;
+                       else {
+                               unsigned char vln = LBT_VLN(&d[i]);
+                               int c = key->val.len < d[i].vln ? key->val.len : vln;
+                               if ( !(cmp = memcmp( key->val.byt, base+pos+cln, c ))
+                                       && !(cmp = (int) key->val.len - vln)
+                               )
+                               return LBT_MATCH | i;
+                       }
+               }
+               if ( cmp > 0 )
+                       l = i+1;
+               else
+                       r = i;
+       }
+       return l; /* == r */
+}      /* keypos */
+
+
+/** find position for (leaf) val.
+       search v in array b of n values of size sz
+       @return position for value (0..n), |ifyed with LBT_MATCH, if found
+*/
+static unsigned short valpos ( unsigned char *b,
+       unsigned short n, unsigned char sz, unsigned char *v )
+{
+       unsigned short l = 0, r = n;
+       /* binsearch invariant:
+               v is < all values i >= r (we do know relation at r)
+               v is > all values i < l (don't know relation at l)
+       */
+       while ( l < r ) { /* invariant: l <= valpos < r */
+               unsigned short i = (l + r) >> 1; /* l <= i < r */
+               int cmp = memcmp( v, b+i*sz, sz );
+               if ( ! cmp )
+                       return LBT_MATCH | i;
+               if ( cmp > 0 )
+                       l = i+1;
+               else
+                       r = i;
+       }
+       return l; /* == r */
+}      /* valpos */
+
+
+#ifndef NDEBUG
+static void TRACEKEY ( unsigned lev, FindKey *f )
+{
+       int i;
+       unsigned n = 0;
+       Hit h;
+       PrVal pv;
+
+       if ( LOG_DONT( lev ) )
+               return;
+       log_msg( lev, "key '%.*s' vln %d val %s",
+               f->key->len, f->key->byt, f->key->val.len,
+               PRVAL( pv, f->key->val ) );
+       if ( f->key->val.len ) {
+               rdval( &h, f->key->val.byt, f->key->val.len );
+               log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d", 
+                       h.dbn, h.mfn, h.tag, h.occ, h.pos );
+       }
+       for ( i=LBT_LEVELS; i--; ) {
+               Block *b = f->b[i];
+               unsigned on = n, e;
+               Dict *d;
+               if ( ! b  )
+                       continue;
+               d = b->dct + (e = (LBT_ENTMASK & f->e[i]));
+               if ( i )
+                       n = GETINT( LBT_BASE(b)+d->pos+d->kln+d->vln );
+               log_msg( lev, "lev %d n %d e 0x%x = %d %c '%.*s' val %d %s -> %d%s",
+                       i, f->n[i], f->e[i], e, (LBT_MATCH & f->e[i]) ? 'M' : '-',
+                       d->kln, LBT_BASE(b)+d->pos, d->vln,
+                       prval( pv, LBT_BASE(b)+d->pos+d->kln, d->vln ),
+                       n, on == b->num ? "" : " LY!"
+               );
+               if ( d->vln ) {
+                       rdval( &h, LBT_BASE(b)+d->pos+d->kln, d->vln );
+                       log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d", 
+                               h.dbn, h.mfn, h.tag, h.occ, h.pos );
+               }
+               if ( e ) {
+                       d--;
+                       log_msg( lev, "\tleft '%.*s' val %d %s",
+                               d->kln, LBT_BASE(b)+d->pos, d->vln,
+                               prval( pv, LBT_BASE(b)+d->pos+d->kln, d->vln )
+                       );
+                       if ( d->vln ) {
+                               rdval( &h, LBT_BASE(b)+d->pos+d->kln, d->vln );
+                               log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d", 
+                                       h.dbn, h.mfn, h.tag, h.occ, h.pos );
+                       }
+               }
+       }
+}      /* TRACEKEY */
+#else
+#define TRACEKEY( l, f ) (void)(f)
+#endif
+
+
+static int findkey ( FindKey *f, lbt_get lock )
+{
+       int i;
+       Block *b = f->bt->root;
+       unsigned char *pkey = 0, pkln = 0, pvln = 0;
+       if ( b->lev >= LBT_LEVELS )
+               return log_msg( ERR_TRASH, "tree is too deep: %u", b->lev );
+       stat.key++;
+       for ( i=LBT_LEVELS; i--; ) { /* paranoia */
+               f->b[i] = 0;
+               f->n[i] = f->e[i] = 0;
+       }
+       for ( i=b->lev;; ) {
+               unsigned short e;
+               unsigned  n;
+               Dict *d;
+#ifndef NDEBUG
+               chkblk( b, f->bt );
+#endif
+               f->n[i] = (f->b[i] = b)->num;
+               f->e[i] = e = KEYPOS( f->bt, b, f->key );
+               LOG_DBG( LOG_DEBUG, "keypos %x", e );
+#ifndef NDEBUG
+               if ( pkey && b->ent ) {
+                       /* compare parent key against our first key
+                               pkey must be equal for inner node and not greater for leave
+                       */
+                       int cmp; /* this - pkey >= 0 */
+                       unsigned char *ent, ln;
+                       d = b->dct;
+                       ent = LBT_BASE(b)+LBT_POS(d);
+                       ln = pkln < d->kln ? pkln : d->kln;
+                       cmp = ln ? memcmp( ent, pkey, ln ) : 0;
+                       if ( ! cmp )
+                               cmp = (int)d->kln - (int)pkln;
+                       if ( ! cmp ) {
+                               if ( ! d->vln || ! pvln )
+                                       cmp = (int)d->vln - (int)pvln;
+                               else
+                                       cmp = memcmp( ent+d->kln, pkey+pkln, f->bt->vsz );
+                       }
+                       if ( cmp && (i || 0 > cmp) ) {
+                               Hit h;
+                               PrVal pv;
+                               TRACEKEY( ERR_TRASH, f );
+                               if ( d->vln ) {
+                                       rdval( &h, ent+d->kln, d->vln );
+                                       log_msg( ERR_TRASH, "\tfirst val db %d mfn %d tag %d occ %d pos %d", 
+                                               h.dbn, h.mfn, h.tag, h.occ, h.pos );
+                               }
+                               return log_msg( ERR_TRASH, "\tbad first '%.*s' val %d %s",
+                                       d->kln, ent, d->vln, prval( pv, ent+d->kln, d->vln ) );
+                       }
+               }
+#endif
+               /*
+                       here goes the Lehmann/Yao race detection:
+                       If we ran on right boundary, follow right link
+               */
+               while ( !(LBT_MATCH & e) && b->ent == e && b->nxt && b->num ) {
+                       Block *s;
+                       unsigned se;
+                       b->lck = LBT_WR; /* hold b while checking s */
+                       s = getblk( f->bt, b->nxt, i, i ? LBT_RD : lock );
+                       if ( i || ! lock )
+                               ULK( b );
+                       else
+                               b->lck = lock;
+                       se = KEYPOS( f->bt, s, f->key );
+                       if ( !se ) { /* lower than first */
+                               ULK( s );
+                               break;
+                       }
+                       LOG_DBG( LOG_DEBUG, "LYao %d -> %d lev %d for '%.*s' got 0x%x",
+                               b->num, s->num, i, f->key->len, f->key->byt, se );
+                       /* trade b for s */
+                       ULK( b );
+                       f->n[i] = (f->b[i] = b = s)->num;
+                       e = se;
+               }
+               f->e[i] = e;
+               if ( ! i )
+                       return 0;
+               if ( !(LBT_MATCH & e) ) {
+                       if ( e )
+                               f->e[i] = --e;
+                       else {
+                               TRACEKEY( ERR_TRASH, f );
+                               return log_msg( ERR_IDIOT,
+                                       "ran on left bound of block %u ent %u",
+                                       b->num, b->ent );
+                       }
+               }
+               d = b->dct + (LBT_ENTMASK & e);
+               /* direct access ok in inner node */
+               pkey = LBT_BASE(b)+d->pos;
+               pkln = d->kln;
+               pvln = d->vln;
+               n = GETINT( pkey+pkln+pvln );
+               i--;
+               b = getblk( f->bt, n, i, i ? LBT_RD : lock );
+       }
+       return -1; /* go away */
+}      /* findkey */
+
+
+static int mktree ( Idx *bt )
+{
+       Batch *ba = bt->bat;
+       unsigned char lev = 0; /* the level to be indexed */
+       unsigned left = 1; /* block on child level */
+       /* stats */
+       unsigned size = bt->len * bt->bsz;
+       unsigned sval = ba->vals * bt->vsz;
+       unsigned sove = bt->len * LBT_DOFF + sizeof(Dict)*(ba->keys+ba->span);
+       unsigned iblk = 0;
+       unsigned ikey = 0;
+       unsigned ikeyl = 0;
+       unsigned ipfx = 0;
+       unsigned ispn = 0;
+
+       log_msg( LOG_INFO,
+               "end batch:\n"
+               "\t#blks %u total len %u free %u (%.1f%%) overhead %u (%.1f%%)\n"
+               "\t#keys %u total len %u (%.1f%%) avg %.1f max %u\n"
+               "\t#vals %u total len %u (%.1f%%) avg %.1f max %u spans %u",
+               bt->len, size, ba->free, ba->free*100./size, sove, sove*100./size,
+               ba->keys, ba->keyl, ba->keyl*100./size, (float)ba->keyl/ba->keys, ba->maxl,
+               ba->vals, sval, sval*100./size, (float)ba->vals/ba->keys, ba->maxv, ba->span
+       );
+
+       /* build tree */
+       for (;;) { /* levels */
+               unsigned  len = 0; /* current is n'th block on level */
+               Block        *b = 0;
+               unsigned  have = 0; /* room in current block b */
+               unsigned char*base = 0; /* of current block b */
+               Dict         *d = 0; /* of current block b */
+               unsigned next = left; /* child */
+
+               left = bt->len;
+               /* get child c and create child pointer in block b */
+               while ( next ) {
+                       unsigned char  vln = 0;
+                       unsigned char *key;
+                       unsigned char  kln;
+                       unsigned  need;
+                       Block *c = getblk( bt, next, lev, LBT_EX );
+                       if ( ! c )
+                               return log_msg( ERR_NOMEM, "no child block" );
+                       next = c->nxt;
+                       if ( ! c->ent ) /* OOPS empty node */
+                               continue;
+                       key = LBT_BASE(c)+LBT_POS(c->dct);
+                       kln = c->dct[0].kln;
+                       if ( ! b ) /* the leftmost child on this level */
+                               kln = 0; /* always 0 key on left */
+                       else if ( lev )
+                               vln = LBT_VLN(c->dct);
+                       else if ( ba->key.len == kln /* compare key to last of prev. */
+                               && !memcmp( ba->key.byt, key, kln ) /* which was saved in ba->key */
+                       ) { /* key spans blocks */
+                               LOG_DBG( LOG_DEBUG, "found span key '%.*s' child %u",
+                                       kln, key, c->num );
+                               vln = bt->vsz; /* qualify with value */
+                       } else {
+#if 1
+                               unsigned char x = 0;
+                               /* find prefix key */
+                               if ( kln > ba->key.len )
+                                       kln = ba->key.len;
+                               while ( x < kln && key[x] == ba->key.byt[x] )
+                                       x++;
+                               if ( x < kln ) /* they differ at [x] */
+                                       kln = x+1;
+                               else /* equal on the shorter length */
+                                       kln++;
+                               if ( kln > c->dct[0].kln )
+                                       return log_msg( ERR_TRASH,
+                                               "bad key '%.*s' on level %u matching previous of length %u",
+                                               c->dct[0].kln, key, lev, ba->key.len );
+                               LOG_DBG( LOG_DEBUG, "prefix on key '%.*s' child %u save %u",
+                                       kln, key, c->num, c->dct[0].kln - kln );
+                               ipfx += c->dct[0].kln - kln;
+#endif
+                       }
+                       if ( vln )
+                               ispn++;
+                       ikeyl += kln;
+                       need = sizeof(Dict) + kln + vln + 4;
+                       if ( need > have ) {
+                               if ( b ) {
+                                       log_msg( LOG_VERBOSE,
+                                               "level %u block %u is full %u ent stack %u",
+                                               lev, b->num, b->ent, b->stk );
+                                       b->nxt = bt->len; /* link */
+                                       b->lck = 0;
+                                       if ( WRULK( b, bt ) ) goto nowrite;
+                                       len++;
+                               }
+                               c->lck = LBT_EX; /* paranoia */
+                               b = NEWBLK( bt, lev+1, LBT_EX );
+                               c->lck = 0;
+                               if ( ! b )
+                                       return log_msg( ERR_NOMEM, "no free block" );
+                               if ( b->ent || b->stk != bt->bsz )
+                                       return log_msg( ERR_TRASH,
+                                               "new block %u is not empty: ent %u stk %u",
+                                               b->num, b->ent, b->stk );
+                               base = LBT_BASE( b );
+                               b->lck = LBT_EX; /* lock */
+                               have = bt->bsz - LBT_DOFF; /* - ba->let; TODO: should we ??? */
+                               d = b->dct;
+                               len++;
+                               iblk++;
+                       }
+                       SETINT( base+(b->stk -= 4), c->num );
+                       d->kln = kln;
+                       d->vln = vln;
+                       if ( kln += vln ) /* key + consecutive qualifying value */
+                               memcpy( base+(b->stk -= kln), key, kln );
+                       d->pos = b->stk;
+                       d++;
+                       b->ent++;
+                       ikey++;
+                       have -= need;
+                       /* save the last dance for me */
+                       memcpy( ba->key.byt, LBT_BASE(c)+LBT_POS(&c->dct[c->ent-1]),
+                               ba->key.len = c->dct[c->ent-1].kln );
+               }
+               if ( !len )
+                       return log_msg( ERR_TRASH, "level %u was empty", lev );
+               /* close level */
+               log_msg( LOG_INFO, "closing level %u with %u blocks %u..%u",
+                       lev, len, left, b->num );
+               b->lck = 0;
+               if ( 1 == len ) {
+                       memcpy( bt->root, b, LBT_BOFF + bt->bsz );
+                       b = bt->root;
+                       b->num = 0;
+               }
+               if ( WRULK( b, bt ) ) goto nowrite;
+               lev++;
+               if ( 2 > len )
+                       break;
+       }
+       log_msg( LOG_INFO,
+               "\tused %u inner blocks %u keys %u bytes avg %.1f\n"
+               "\t%u #spans (%.1f%%) %u pfx (- %.1f%%)",
+               iblk, ikey, ikeyl, (float)ikeyl/ikey,
+               ispn, ispn*100.*bt->vsz/ikeyl, ipfx, ipfx*100./(ipfx+ikeyl) );
+
+       mFree( bt->bat );
+       bt->bat = 0;
+       return 0;
+nowrite:
+       return log_msg( LOG_SYSERR, "\twhen writing batch block" );
+}      /* mktree */
+
+
+
+/* ************************************************************
+       package data
+*/
+
+
+
+/* ************************************************************
+       package functions
+*/
+
+int lbt_batch ( Idx *bt, unsigned char pctfree )
+{
+       bt->bat = (Batch*)mAlloc( sizeof(Batch) );
+       if ( ! bt->bat )
+               return log_msg( ERR_NOMEM, "\twhen getting batch" );
+       memset( bt->bat, 0, sizeof(Batch) );
+       if ( pctfree > 50 )
+               pctfree = 50;
+       bt->bat->let = pctfree * bt->bsz / 100;
+       lio_trunc( &bt->fd, 0 );
+       iniblk( bt->root, bt, 0, 0 );
+       /* TODO: kill hash */
+       bt->len = 1;
+       bt->bat->cur = NEWBLK( bt, 0, LBT_EX );
+       return 0;
+}      /* lbt_batch */
+
+
+int lbt_batchval ( Idx *bt, Key *key )
+{
+       int cmp = 0;
+       Batch *ba = bt->bat;
+       if ( key != &ba->key ) {
+               int l = key->len < ba->key.len ? key->len : ba->key.len; /* min */
+               if ( ! (cmp = memcmp(key->byt, ba->key.byt, l)) )
+                       cmp = (int)key->len - (int)ba->key.len;
+               if ( 0 > cmp )
+                       return log_msg( LOG_ERROR, "batch key '%.*s' < last '%.*s'",
+                               key->len, key->byt, ba->key.len, ba->key.byt );
+       }
+       /* flush ? */
+       while ( ! key->val.len /* end */
+               || LBT_ENTMAX == ba->got /* full */
+               || cmp /* next key */
+       ) {
+               Block *b = ba->cur;
+               unsigned have = b->stk - LBT_DOFF - b->ent*sizeof(Dict) - ba->let;
+               unsigned need = sizeof(Dict) + ba->key.len;
+               unsigned want = need + ba->got * bt->vsz;
+               unsigned take = ba->got;
+               if ( have < want ) { /* you can't always get ... */
+                       if ( have <= need )
+                               take = 0;
+                       else {
+                               take = (have - need) / bt->vsz;
+                               if ( 256 > take * bt->vsz )
+                                       take = 0;
+                       }
+               }
+#if 0 /* prefix key turbo ??? -- doesn't seem to work all too well */
+               if ( have < (bt->bsz >> 1) ) { /* ready to waste half a block ? */
+                       Dict *d = &b->dct[ b->ent-1 ];
+                       unsigned char *last = LBT_BASE(b)+LBT_POS(d);
+                       if ( ba->key.byt[0] != last[0]
+                               || ba->key.byt[1] != last[1]
+                               || ba->key.byt[2] != last[2]
+                       )
+                               take = 0;
+               }
+#endif
+               LOG_DBG( LOG_DEBUG, "batch '%.*s' got %u need %u want %u have %u take %u",
+                       ba->key.len, ba->key.byt, ba->got, need, want, have, take );
+               /* we have enough room in b for take entries */
+               if ( take ) {
+                       unsigned char *base = LBT_BASE( b );
+                       unsigned char *s = base + b->stk;
+                       int i = take;
+                       Dict *d = &b->dct[ b->ent++ ];
+                       while ( i-- )
+                               memcpy( s -= bt->vsz, ba->val[i].byt, bt->vsz );
+                       memcpy( s -= ba->key.len, ba->key.byt, ba->key.len );
+                       b->stk = (unsigned short) (s - base);
+                       LBT_PV( d, b->stk, take );
+                       d->kln = ba->key.len;
+                       if ( ba->got > take )
+                               memcpy( ba->val, ba->val+take, sizeof(ba->val[0])*(ba->got - take) );
+                       ba->got -= take;
+               }
+               if ( ! ba->got )
+                       break;
+               if ( take ) /* key spans blocks */
+                       ba->span++;
+               ba->free += b->stk - LBT_DOFF - b->ent*sizeof(Dict); /* wasted */
+               b->nxt = bt->len; /* link */
+               LOG_DBG( LOG_DEBUG, "batch: %u entries left, getting block %u",
+                       ba->got, bt->len );
+               if ( WRULK( b, bt ) )
+                       return log_msg( LOG_SYSERR, "\twhen writing batch block" );
+               /* so couldn't flush all, thus we need another block */
+               b = ba->cur = NEWBLK( bt, 0, LBT_EX );
+               /* will continue on end or keychange */
+               if ( b->ent || b->stk < bt->bsz )
+                       return log_msg( ERR_TRASH, "new block %u is not empty: ent %u stk %u",
+                               b->num, b->ent, b->stk );
+               LOG_DBG( LOG_VERBOSE, "new block %u has %u ent %u stk, will keep %u",
+                       b->num, b->ent, b->stk, ba->let );
+       }
+       if ( cmp || ! key->val.len ) /* done with key: stats */
+               if ( ba->maxv < ba->tot )
+                       ba->maxv = ba->tot;
+       if ( key->val.len ) {
+               if ( cmp ) {    /* set new key */
+                       if ( ba->got )
+                               return log_msg( ERR_IDIOT, "batch not empty on new key" );
+                       ba->tot = 0;
+                       memcpy( ba->key.byt, key->byt, ba->key.len = key->len );
+                       ba->keys++;
+                       ba->keyl += key->len;
+                       if ( ba->maxl < key->len )
+                               ba->maxl = key->len;
+               }
+               /* add new val */
+               /* LOG_HEX( key->val.byt, bt->vsz ); */
+               memcpy( ba->val[ ba->got++ ].byt, key->val.byt, bt->vsz );
+               ba->vals++;
+               ba->tot++;
+       } else {
+               WRULK( ba->cur, bt );
+               mktree( bt );
+       }
+       
+       return 0;
+}      /* lbt_batchval */
+
+
+/* TODO: this function is an awful mess and needs complete rewrite, KR
+*/
+int lbt_add ( Idx *bt, Key *key )
+{
+       int ret = 0;
+       unsigned short e, i = 0;
+       Block *b, *s = 0; /* mainblock, sibling */
+       Dict *d;
+       unsigned char *base, *sbase, *src, *v, *stk;
+       FindKey f;
+       int hadkey, split = 0, insibling = 0;
+       unsigned need;
+       PrVal pv;
+
+       f.bt  = bt;
+       f.key = key;
+       if ( bt->key && key->len > bt->key )
+               key->len = bt->key;
+       if ( findkey( &f, LBT_WR ) ) {
+               if ( f.b[0] )
+                       ULK( f.b[0] );
+               b = 0;
+               LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
+       }
+       e = (LBT_ENTMASK & f.e[0]);
+       b = f.b[0];
+       base = LBT_BASE(b);
+       d = b->dct + e;
+       if ( ! (hadkey = LBT_MATCH & f.e[0]) ) /* new key */
+               need = key->len + sizeof(Dict) + bt->vsz;
+       else {
+               unsigned vln = LBT_VLN(d);
+               v = base+LBT_POS(d)+d->kln;
+               if ( 1 == vln ) { /* check stopword -- value entirely zeroed */
+                       int l = bt->vsz;
+                       unsigned char *C = v;
+                       while ( l-- && !*C++ ) /* huh huh, he said "C++" */
+                               ;
+                       if ( -1 == l ) {
+                               LOG_DBG( LOG_VERBOSE, "key '%.*s' is a stopword", key->len, key->byt );
+                               goto done;
+                       }
+               }
+               i = valpos( v, vln, bt->vsz, key->val.byt );
+               if ( LBT_MATCH & i ) {
+                       LOG_DBG( LOG_DEBUG,
+                               "adding key '%.*s'/%s: already there pos %u",
+                               key->len, key->byt, PRVAL( pv, key->val ), i & ~LBT_MATCH );
+                       goto done; /* value already there */
+               }
+               need = bt->vsz;
+       }
+       LOG_DBG( LOG_DEBUG,
+               "adding key '%.*s'/%s in block %d had %d ent %d stk %d need %d",
+               key->len, key->byt, PRVAL( pv, key->val ),
+               b->num, hadkey, b->ent, b->stk, need );
+
+       if ( need > b->stk - LBT_DOFF - b->ent*sizeof(Dict) ) {
+               /* now that's bad: not enough space
+                       create new block s, move last entries there
+                       starting from the end, entries (including the new one) are collected,
+                       until their combined size reaches more than half a block.
+                       If the combined size then is more than 2/3 a block,
+                       and the last entry alone takes more than 1/3 a block, it is split.
+                       TODO: check existing sibling for free space
+               */
+               unsigned half  = (bt->bsz - LBT_DOFF) >> 1;
+               unsigned tot   = 0;
+               unsigned mv = b->ent;
+
+               LOG_DBG( LOG_DEBUG, "splitting block %d need %d have %d (stk %d ent %d)",
+                       b->num, need, b->stk - LBT_DOFF - b->ent*sizeof(Dict), b->stk, b->ent );
+               if ( need > (bt->bsz - LBT_DOFF) / 3 )
+                       LOG_OTO( done, (ERR_IDIOT, "OOPS! excessive need for %d bytes", need) );
+               /* figure out split position */
+               if ( !hadkey && b->ent == e ) {
+                       tot = need;
+                       insibling = !0;
+               }
+               d = b->dct+mv;
+               while ( d--, mv-- ) {
+                       unsigned sz = sizeof(Dict)+d->kln+LBT_VLN(d)*bt->vsz;
+                       if ( half > (tot += sz) ) { /* sz fits */
+                               if ( e == mv && !hadkey ) {
+                                       /*
+                                               check size of new entry.
+                                               In case of hadkey, we don't bother for the bt->vsz bytes.
+                                       */
+                                       if ( half < (tot + need) ) {
+                                               if ( (half - tot) > need/2 ) { /* more balanced if we move */
+                                                       tot += need;
+                                                       insibling = !0;
+                                               }
+                                               break;
+                                       }
+                                       tot += need;
+                                       insibling = !0;
+                               }
+                               if ( mv )
+                                       continue; /* else croak on ! mv below */
+                       }
+                       /* exceeded half -- maybe we move entry mv anyway */
+                       if ( tot > (bt->bsz - LBT_DOFF)*2/3 ) {
+                               if ( tot-sz < (bt->bsz - LBT_DOFF)/3 )
+                                       split = !0;
+                               else { /* undo -- don't move this entry */
+                                       tot -= sz;
+                                       d++; mv++;
+                               }
+                       } else if ( ! mv )
+                               LOG_OTO( done, (ERR_IDIOT, "OOPS! didn't find end tot %d bytes", tot) );
+                       break;
+               }       /* while mv-- */
+               if ( mv < e || (mv == e && hadkey && !split) )
+                       insibling = !0;
+               /* now mv is the first entry to move */
+               s = NEWBLK( bt, 0, LBT_WR );
+               LOG_DBG( LOG_DEBUG,
+                       "split: move %d of %d to new blk %d e %d tot %d insibling %d split %d",
+                       mv, b->ent, s->num, e, tot, insibling, split );
+               s->nxt = b->nxt;
+               b->nxt = s->num;
+               sbase = LBT_BASE(s);
+               if ( split ) {
+                       /* now total includes full size of entry mv.
+                               if we included a new entry, it might even exceed bsz.
+                               calculate sizes for both blocks w/o mv's values.
+                       */
+                       unsigned esz = sizeof(Dict)+d->kln; /* entry's base cost */
+                       unsigned vln = LBT_VLN(d);
+                       unsigned keep, diff, mvval, kpval, mvsiz, pos = LBT_POS(d);
+                       /*
+                               from currently used space, substract tot (including all
+                               values and possibly new key "need"), but add entry base cost.
+                               that should be the size remaining if we where moving
+                               all of the values but keep an entry and key.
+                               maybe slightly biased if need is bt->vsz (the hadkey case).
+                       */
+                       keep = bt->bsz - b->stk + b->ent*sizeof(Dict) + need - tot + esz;
+                       tot -= vln * bt->vsz; /* undo entries */
+                       diff = (keep > tot ? keep - tot : tot - keep) / bt->vsz;
+                       if ( diff > vln )
+                               LOG_OTO( done, (ERR_IDIOT,
+                                       "OOPS! got diff %d vln %d entry %d tot %d keep %d",
+                                       diff, vln, mv, tot, keep) );
+                       mvval = (vln - diff)/2;
+                       if ( keep > tot )
+                               mvval += diff;
+                       kpval = vln - mvval;
+                       if ( ! mvval || ! kpval ) {
+                               LOG_DBG( LOG_WARN, "bad mv/kpval %d/%d keep %d tot %d vln %d diff %d",
+                                       mvval, kpval, keep, tot, vln, diff );
+                               if ( ! mvval )
+                                       kpval = vln - (mvval = 1);
+                               else
+                                       mvval = vln - (kpval = 1);
+                       }
+                       tot += mvsiz = mvval*bt->vsz;
+                       keep += kpval*bt->vsz;
+                       src = base+pos;
+                       LOG_DBG( LOG_DEBUG,
+                               "split: entry %d '%.*s' mv %d of %d values, i %d keep %d tot %d",
+                               mv, d->kln, src, mvval, vln, i, keep, tot );
+                       if ( keep > (bt->bsz-LBT_DOFF) || tot > (bt->bsz-LBT_DOFF) )
+                               LOG_OTO( done, (ERR_IDIOT,
+                                       "OOPS! got diff %d vln %d mvval %d entry %d tot %d keep %d",
+                                       diff, vln, mvval, mv, tot, keep) );
+                       /* SIGH -- do it */
+                       memcpy( sbase + (s->stk -= mvsiz),
+                               src+d->kln+kpval*bt->vsz, mvsiz );
+                       if ( e == mv && hadkey && i > kpval ) {
+                               insibling = !0;
+                               s->stk -= bt->vsz;
+                               i -= kpval;
+                               memmove( sbase+s->stk, sbase+s->stk+bt->vsz, i*bt->vsz );
+                               memcpy( sbase+s->stk+i*bt->vsz, key->val.byt, bt->vsz );
+                               mvval++;
+                       }
+                       memcpy( sbase + (s->stk -= d->kln), src, d->kln );
+                       LBT_PVK( s->dct, s->stk, mvval, d->kln );
+                       s->ent = 1;
+                       memmove( src + mvsiz, src, d->kln+kpval*bt->vsz );
+                       pos += mvsiz;
+                       LBT_PV( d, pos, kpval );
+                       d++; mv++;
+               }
+               /* now entries mv..b->ent are moved, mv > 0 */
+               b->stk = LBT_POS(b->dct+mv-1); /* pre-adjust stack */
+               if ( mv < e ) { /* move entries mv..e-1 */
+                       /* used stack space is from pos of entry e-1 to end of entry mv
+                               (might differ from pos of mv-1 due to split)
+                       */
+                       unsigned from = LBT_POS(b->dct+e-1), nent = e-mv,
+                               sz = LBT_POS(b->dct+mv)+b->dct[mv].kln
+                                       +LBT_VLN(b->dct+mv)*bt->vsz - from;
+                       int adj = (s->stk -= sz) - from; /* entry position adjustment */
+                       memcpy( sbase + s->stk, base+from, sz );
+                       memcpy( s->dct+s->ent, b->dct+mv, nent*sizeof(Dict) );
+                       d = s->dct + s->ent;
+                       s->ent += nent;
+                       for ( ; nent--; d++ )
+                               LBT_POSADD( d, adj );
+               }
+               if ( insibling && mv <= e ) { /* now mv or insert e, unless done on split */
+                       d = b->dct + e;
+                       if ( hadkey ) { /* mv existing entry e >= mv, add new val */
+                               /* i is valpos */
+                               unsigned vln = LBT_VLN(d);
+                               src = base+LBT_POS(d)+d->kln;
+                               if ( i < vln ) {
+                                       unsigned sz = (vln - i)*bt->vsz;
+                                       memcpy( sbase + (s->stk -= sz), src + i*bt->vsz, sz );
+                               }
+                               memcpy( sbase + (s->stk -= bt->vsz), key->val.byt, bt->vsz );
+                               if ( i ) {
+                                       unsigned sz = i*bt->vsz;
+                                       memcpy( sbase + (s->stk -= sz), src, sz );
+                               }
+                               memcpy( sbase + (s->stk -= d->kln), base+LBT_POS(d), d->kln );
+                               LBT_PVK( s->dct + s->ent, s->stk, vln+1, d->kln );
+                               e++; /* e is moved */
+                       } else {
+                               memcpy( sbase + (s->stk -= bt->vsz), key->val.byt, bt->vsz );
+                               memcpy( sbase + (s->stk -= key->len), key->byt, key->len );
+                               LBT_PVK( s->dct + s->ent, s->stk, 1, key->len );
+                       }
+                       s->ent++;
+               } /* mv or insert e */
+               if ( e < b->ent && mv < b->ent ) { /* move entries max(e,mv) to end */
+                       unsigned fst = mv < e ? e : mv;
+                       /* used stack space is from pos of entry b->ent-1 (the original b->stk)
+                               to end of entry fst (might differ from pos of fst-1 due to split)
+                       */
+                       unsigned from = LBT_POS(b->dct+b->ent-1), nent = b->ent-fst,
+                               sz = LBT_POS(b->dct+fst)+b->dct[fst].kln
+                                       +LBT_VLN(b->dct+fst)*bt->vsz - from;
+                       int adj = (s->stk -= sz) - from; /* entry position adjustment */
+                       LOG_DBG( LOG_DEBUG, "split: entries %d..%d sz %d from %d to %d",
+                               fst, b->ent-1, sz, from, s->stk );
+                       memcpy( sbase + s->stk, base+from, sz );
+                       memcpy( s->dct+s->ent, b->dct+fst, nent*sizeof(Dict) );
+                       d = s->dct + s->ent;
+                       s->ent += nent;
+                       for ( ; nent--; d++ ) {
+#if !defined(NDEBUG)
+                               unsigned p = LBT_POS(d);
+#endif
+                               LBT_POSADD( d, adj );
+#if !defined(NDEBUG)
+                               LOG_DBG( LOG_DEBUG, "new ent %d p/v/k %d/%d/%d o %d",
+                                       d - s->dct, LBT_POS(d), LBT_VLN(d), d->kln, p );
+#endif
+                       }
+               }
+               b->ent = mv; /* kill entries */
+               LOG_DBG( LOG_DEBUG,
+                       "split: blks %d/%d with %d/%d entries, s starts '%.*s'",
+                       b->num, s->num, b->ent, s->ent, s->dct[0].kln, sbase+LBT_POS(s->dct) );
+       }       /* new sibling */
+
+       if ( ! insibling ) {
+               /* now we do have enough space */
+               d = b->dct + e;
+               stk = base + b->stk; /* bottom of data stack */
+               if ( hadkey ) { /* insert value at i */
+                       unsigned char *val = base + LBT_POS(d)+d->kln+i*bt->vsz;
+                       need = bt->vsz;
+#if 0
+               LOG_DBG( LOG_DEBUG,
+                       "in entry %d '%.*s' val %d: memmove( %p %p %d) val %p need %d",
+                       e, d->kln, base+LBT_POS(d), i,
+                       stk-need, stk, (val-stk)+need, val, need );
+#endif
+                       memmove( stk - need, stk, (val - stk)+need );
+                       memcpy( val - need, key->val.byt, bt->vsz );
+                       LBT_VLNADD( d, 1 );
+               } else { /* insert key, value and dict entry */
+                       /* take dict and data space from entry e */
+                       unsigned char *nu;
+                       need = key->len + bt->vsz;
+                       if ( e == b->ent )
+                               nu = base + b->stk - need;
+                       else {
+                               /* at end of next entry, i.e. end of block or beg of previous */
+                               nu = base + LBT_POS(d) + d->kln + LBT_VLN(d)*bt->vsz;
+                               memmove( stk - need, stk, nu - stk );
+                               nu -= need;
+                               memmove( d+1, d, (b->ent - e)*sizeof(Dict) );
+                       }
+                       memcpy( nu, key->byt, key->len );
+                       memcpy( nu + key->len, key->val.byt, bt->vsz );
+                       LBT_PV( d, nu-base, 1 );
+                       d->kln = key->len;
+                       d++;
+                       e++;
+                       b->ent++;
+               }
+               /*
+                       need is set to the needed (i.e. additionally used) data space,
+                       e is the first dict entry to redirect need bytes lower
+               */
+               b->stk -= need;
+               for ( ; e < b->ent; e++, d++ )
+                       LBT_POSADD( d, -need );
+       }       /* ! insibling */
+       while ( s ) { /* created sibling */
+               unsigned num = s->num;
+               unsigned char *e0 = LBT_BASE(s) + LBT_POS(s->dct);
+               unsigned l = s->lev + 1;
+               Block *ins;
+               Key k;
+
+               memcpy( k.byt, e0, k.len = s->dct[0].kln );
+               k.val.len = 0;
+               if ( s->lev ? s->dct[0].vln : split )
+                       memcpy( k.val.byt, e0+k.len, k.val.len = bt->vsz );
+               LOG_DBG( LOG_DEBUG, "split: had %d/%d on lev %d key '%.*s' vln %d %s",
+                       b->num, num, s->lev, k.len, k.byt, k.val.len, PRVAL( pv, k.val ) );
+
+               /* get parent */
+               if ( f.b[l]->num == f.n[l] && ! LBT_BUSY(f.b[l]) )
+                       f.b[l]->lck = LBT_WR;
+               else
+                       f.b[l] = getblk( bt, f.n[l], l, LBT_WR );
+               LOG_DBG( LOG_DEBUG,
+                       "split: got parent %d lev %d", f.b[l]->num, f.b[l]->lev );
+               /* MT TODO: again L/Y link hunt */
+               /* ok, locked the parent -- release s and b */
+               if ( (ret = WRULK( s, bt )) || (ret = WRULK( b, bt )) )
+                       break;
+               s = 0;
+               ins = b = f.b[l];
+               base = LBT_BASE(b);
+               e = KEYPOS( bt, b, &k );
+               LOG_DBG( LOG_DEBUG, "split: got parent pos %d", e );
+               if ( LBT_MATCH & e ) /* !??? */
+                       LOG_OTO( done, (ERR_TRASH, "OOPS! found key in parent" ) );
+               if ( ! e ) {
+                       TRACEKEY( ERR_TRASH, &f );
+                       LOG_OTO( done, (ERR_TRASH, "OOPS! parent insert at 0 lev %d", l ) );
+               }
+               need = sizeof(Dict) + k.len + 4 + k.val.len;
+               if ( need > b->stk - LBT_DOFF - b->ent*sizeof(Dict) ) {
+                       unsigned half = ((bt->bsz - b->stk)+b->ent*sizeof(Dict)+need) / 2;
+                       unsigned mv = b->ent, tot = 0, nent, sz;
+                       int adj;
+                       if ( e == b->ent ) {
+                               tot = need;
+                               ins = 0;
+                       }
+                       for ( d = b->dct+mv; d--, mv--; ) {
+                               sz = sizeof(Dict) + d->kln + 4 + d->vln;
+                               if ( half > (tot += sz) ) {
+                                       if ( e == mv ) {
+                                               if ( half > tot+need || tot+need-half < half-tot ) {
+                                                       tot += need;
+                                                       ins = 0; /* new link goes to sibling */
+                                               } else
+                                                       break;
+                                               if ( half <= tot )
+                                                       break;
+                                       }
+                                       if ( mv )
+                                               continue;
+                               }
+                               if ( tot - half > sz/2 ) { /* don't move this */
+                                       tot -= sz;
+                                       d++; mv++;
+                               } else if ( ! mv ) /* !??? */
+                                       mv = 1;
+                               break;
+                       }
+                       /* split here a little bit simpler */
+                       s = NEWBLK( bt, l, LBT_WR );
+                       s->nxt = b->nxt;
+                       b->nxt = s->num;
+                       /* straight move entries mv..b->ent-1 to sibling */
+                       sbase = LBT_BASE(s);
+                       d = b->dct + mv;
+                       sz = b->dct[mv-1].pos - b->stk;
+                       nent = b->ent - mv;
+                       LOG_DBG( LOG_DEBUG,
+                               "split: parent lev %d split %d to new %d mv %d"
+                               " '%.*s'/%s nent %d sz %d",
+                               l, b->num, s->num, mv, d->kln, base+d->pos,
+                               prval( pv, base+d->pos+d->kln, d->vln ), nent, sz );
+                       memcpy( sbase + (s->stk -= sz), base + b->stk, sz );
+                       adj = s->stk - b->stk;
+                       b->stk += sz;
+                       memcpy( s->dct, d, nent*sizeof(Dict) );
+                       b->ent = mv;
+                       s->ent = nent;
+                       for ( d = s->dct; nent--; d++ )
+                               d->pos += adj;
+                       if ( ! ins ) {
+                               ins = s;
+                               e -= mv;
+                       }
+                       if ( (ret = chkblk( s, bt )) )
+                               break;
+               }
+               /* now insert link in block ins at pos e */ {
+                       unsigned isz = k.len + 4 + k.val.len, nent = ins->ent - e;
+                       unsigned char *ibase = LBT_BASE(ins);
+                       unsigned end = e ? ins->dct[e-1].pos : bt->bsz;
+                       LOG_DBG( LOG_DEBUG,
+                               "split: ins parent %d lev %d pos %d key '%.*s' -> %d vln %d %s",
+                               ins->num, ins->lev, e, k.len, k.byt, num, k.val.len,
+                               PRVAL( pv, k.val ) );
+                       memmove( ibase + ins->stk - isz, ibase + ins->stk, end - ins->stk );
+                       ins->stk -= isz;
+                       memcpy( ibase + (end -= 4), &num, 4 );
+                       if ( k.val.len )
+                               memcpy( ibase + (end -= k.val.len), k.val.byt, k.val.len );
+                       memcpy( ibase + (end -= k.len), k.byt, k.len );
+                       for ( d = ins->dct+ins->ent; nent--; d-- ) {
+                               *d = d[-1];
+                               d->pos -= isz;
+                       }
+                       d->pos = end;
+                       d->vln = k.val.len;
+                       d->kln = k.len;
+                       ins->ent++;
+                       if ( (ret = chkblk( ins, bt )) )
+                               break;
+               }
+               if ( ! s ) /* write b below or in next turn */
+                       break;
+               if ( ! b->num ) { /* root split */
+                       Block *bb = NEWBLK( bt, l, LBT_WR );
+                       LOG_DBG( LOG_DEBUG, "root split!" );
+                       /* cp all but the num */
+                       memcpy( LBT_BASE(bb)+4, LBT_BASE(b)+4, bt->bsz-4 );
+                       b->nxt = s->nxt; /* restore root's special nxt */
+                       s->nxt = 0;
+                       b->stk = bt->bsz; /* empty the root */
+                       /* add two childs bb and s */
+                       SETINT( base + (b->stk -= 4), bb->num );
+                       LBT_PVK( b->dct, b->stk, 0, 0 );
+                       SETINT( base + (b->stk -= 4), s->num );
+                       e0 = LBT_BASE(s) + LBT_POS(s->dct);
+                       if ( s->dct[0].vln )
+                               memcpy( base + (b->stk -= bt->vsz), e0+s->dct[0].kln, bt->vsz );
+                       memcpy( base + (b->stk -= s->dct[0].kln), e0, s->dct[0].kln );
+                       LBT_PVK( b->dct+1, b->stk, s->dct[0].vln, s->dct[0].kln );
+                       b->ent = 2;
+                       b->lev++; /* increase tree depth */
+                       if ( (ret = WRULK( s, bt )) || (ret = WRULK( bb, bt )) )
+                               break;
+                       s = 0;
+                       break;
+               }
+       }
+       if ( b && !ret )
+               ret = WRULK( b, bt );
+done:
+       if ( s )
+               ULK( s );
+       if ( b )
+               ULK( b );
+       if ( !ret && b->lev && findkey( &f, LBT_RD ) )
+               ret = log_msg( ERR_TRASH, "could not find key after add" );
+       return ret;
+}      /* lbt_add */
+
+
+int lbt_del ( Idx *bt, Key *key )
+{
+       int ret = 0;
+       unsigned short e, n, i, mv;
+       Block *b;
+       Dict *d;
+       unsigned char *base, *v;
+       FindKey f;
+       PrVal pv;
+
+       f.bt  = bt;
+       f.key = key;
+       LOG_DBG( LOG_DEBUG, "deleting key '%.*s'/%s",
+               key->len, key->byt, PRVAL( pv, key->val ) );
+       if ( findkey( &f, LBT_WR ) )
+               LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
+       if ( !(LBT_MATCH & f.e[0]) ) /* key not found - ok */
+               goto done;
+       LOG_DBG( LOG_DEBUG, "key found" );
+       e = (LBT_ENTMASK & f.e[0]);
+       b = f.b[0];
+       d = b->dct + e;
+       base = LBT_BASE(b);
+       v = base+LBT_POS(d)+d->kln; /* start of values */
+       n = LBT_VLN(d);
+       if ( ! n )
+               goto done;
+       i = valpos( v, n, bt->vsz, key->val.byt );
+       LOG_DBG( LOG_DEBUG, "val %sfound pos %u of %u",
+               (LBT_MATCH & i) ? "" : "not ", i & ~LBT_MATCH, n );
+       if ( !(LBT_MATCH & i) ) /* val not found - ok */
+               goto done;
+       i &= ~LBT_MATCH;
+       LOG_DBG( LOG_DEBUG,
+               "del blk %d e %d i/n %d/%d", b->num, e, i, n );
+       if ( 1 < n ) {
+               v += i*bt->vsz; /* end of stack area to move: start of this value */
+               mv = bt->vsz; /* amount to move: one value size */
+               LBT_VLNADD( d, -1 );
+       } else { /* sole value -- rm whole entry */
+               v = base+LBT_POS(d); /* start of key */
+               mv = d->kln+bt->vsz;
+               for ( i=b->ent - e - 1; i--; d++ ) /* move the i entries after d down */
+                       *d = d[1];
+               d = b->dct + e; /* reset */
+               b->ent--;
+       }
+       memmove( base+b->stk+mv, base+b->stk, v-(base+b->stk) );
+       b->stk += mv;
+       for ( ; e++ < b->ent; d++ )
+               LBT_POSADD( d, mv );
+       WRULK( b, bt );
+done:
+       if ( f.b[0] )
+               ULK( f.b[0] );
+       return ret;
+}      /* lbt_del */
+
+
+
+int lbt_loop ( Idx *bt, DXLoop *l )
+{
+       int ret = 0;
+       unsigned i = 0;
+       int mode = IDXMODE & l->flg;
+       Block *b = 0;
+       Key *cmp = &l->key;
+       if ( ! cmp->len ) /* start at 1st leaf */
+               b = getblk( bt, 1, 0, LBT_RD );
+       else { /* locate key */
+               FindKey f;
+               f.bt  = bt;
+               f.key = cmp;
+               if ( findkey( &f, LBT_RD ) )
+                       LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
+               b = f.b[0];
+               i = (unsigned)(LBT_ENTMASK & f.e[0]);
+               if ( !(LBT_MATCH & f.e[0]) && IDXEQ == mode )
+                       goto done;
+       }
+       if ( IDXUPTO <= mode )
+               cmp = &l->to;
+       for ( ;; i=0 ) { /* blocks */
+               unsigned char *base = LBT_BASE( b );
+               Dict *d;
+               if ( ! b || b->lev )
+                       return -ERR_TRASH;
+               b->lck = LBT_EX; /* lock */
+               d = b->dct + i;
+               for ( ; i<b->ent; i++, d++ ) {
+                       unsigned short pos = LBT_POS( d );
+                       unsigned short vln = LBT_VLN( d );
+                       unsigned short kln = d->kln;
+                       unsigned short j;
+                       unsigned char *v = base+pos+kln;
+                       Key key;
+                       Hit hit;
+                       int diff = memcmp( cmp->byt, base+pos,
+                               kln <= cmp->len ? kln : cmp->len );
+                       switch ( mode ) {
+                       case IDXPF:
+                               if ( diff || kln < cmp->len )
+                                       goto moan;
+                               break;
+                       case IDXEQ:
+                               if ( diff || kln != cmp->len )
+                                       goto done;
+                               break;
+                       case IDXUPTO:
+                               if ( 0 >= diff )
+                                       goto done;
+                               break;
+                       case IDXINCL:
+                               if ( 0 > diff || (!diff && kln > cmp->len) )
+                                       goto done;
+                               break;
+                       default: /* unused */
+                               moan:
+                               log_msg( LOG_INFO,
+                                       "loop ends on key '%.*s'(%d) %d %d",
+                                       kln, base+pos, kln, diff, cmp->len );
+                               goto done;
+                       }
+                       memcpy( key.byt, base+pos, key.len = kln );
+                       if ( l->cb )
+                               for ( j=0; j<vln; j++, v += bt->vsz ) {
+                                       rdval( &hit, v, bt->vsz );
+                                       ret = l->cb( l->me, &key, &hit );
+                                       if ( ret )
+                                               goto done;
+                               }
+                       else { /* internal check */
+                               FindKey f;
+                               f.bt  = bt;
+                               f.key = &key;
+                               memcpy( key.val.byt, v, key.val.len = bt->vsz ); 
+                               if ( findkey( &f, LBT_RD ) )
+                                       LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
+                               if ( !(LBT_MATCH & f.e[0])
+                                       || b != f.b[0]
+                                       || i != (unsigned)(LBT_ENTMASK & f.e[0])
+                               ) {
+                                       log_msg( ERR_TRASH,
+                                               "OOPS! want %u(%p).%u got %u(%p).%u (match %x)",
+                                               b->num, b, i, f.b[0]->num, f.b[0], LBT_ENTMASK & f.e[0], f.e[0] );
+                                       rdval( &hit, v, bt->vsz );
+                                       log_msg( ERR_TRASH,
+                                               "\twhen looking for '%.*s'/%u.%u.%u.%u",
+                                               key.len, key.byt, hit.mfn, hit.tag, hit.occ, hit.pos );
+                                       if ( f.b[0]->ent <= (LBT_ENTMASK & f.e[0]) )
+                                               log_msg( ERR_TRASH, "\tI found nothing" );
+                                       else {
+                                               Dict *dd = f.b[0]->dct + (LBT_ENTMASK & f.e[0]);
+                                               unsigned char *bb = LBT_BASE(f.b[0])+LBT_POS(dd);
+                                               rdval( &hit, bb+dd->kln, bt->vsz );
+                                               log_msg( ERR_TRASH,
+                                                       "\tI found '%.*s'/%u.%u.%u.%u",
+                                                       dd->kln, bb, hit.mfn, hit.tag, hit.occ, hit.pos );
+                                       }
+                               } else
+                                       log_msg( LOG_TRACE, "key ok" );
+                       }
+               }
+               ULK( b );
+               if ( ! b->nxt || !(b = getblk(bt, b->nxt, 0, LBT_RD)) )
+                       break;
+       }
+done:
+       if ( b )
+               ULK( b );
+       if ( l->cb )
+               l->cb( l->me, 0, 0 );
+       else
+               log_msg( LOG_INFO,
+                       "checked %i keys in %u blks of %dK depth %u cmp/get/miss %u/%u/%u",
+                       stat.key, bt->len, bt->bsz>>10, bt->root->lev, stat.cmp, stat.get, stat.miss );
+       return ret;
+}      /* lbt_loop */
+
+
+/* mimik the plain old ldb_search */
+int lbt_search ( Idx *bt, Key *key, LdbPost *post, Rec *rec )
+{
+       int ret = 0;
+       unsigned i = 0;
+       int cont = rec && rec->len; /* continuing previous terms search */
+       char *stk = rec ? ((char*)rec + rec->bytes) : 0; /* term stack */
+       int prefix;
+       Block *b = 0;
+       Key start;
+       LdbP * const p = post->p; /* really just an alias */
+
+       /* check for prefix match */
+       if ( post ) {
+               if ( 8 != bt->vsz )
+                       return log_msg( ERR_INVAL, "bad legacy search on vsz %d", bt->vsz );
+               prefix = LDB_PFX & post->mode;
+       } else {
+               if ( ! rec )
+                       return log_msg( ERR_INVAL, "lbt_search needs rec or post" );
+               if ( (prefix = key->len && '$' == key->byt[key->len - 1]) )
+                       key->len--;
+       }
+
+       if ( ! cont )
+               start = *key;
+       else {
+               memset( &start, 0, sizeof(start) );
+               start.len = 255 < rec->field[rec->len-1].len
+                       ? 255 : rec->field[rec->len-1].len;
+               memcpy( start.byt, rec->field[rec->len-1].val, start.len );
+               rec->len = 0;
+       }
+               
+       if ( ! start.len ) /* start at 1st leaf */
+               b = getblk( bt, 1, 0, LBT_RD );
+       else { /* locate key */
+               FindKey f;
+               f.bt  = bt;
+               f.key = &start;
+               if ( findkey( &f, LBT_RD ) )
+                       LOG_OTO( ouch, ( ERR_TRASH, "could not find key" ) );
+               b = f.b[0];
+               i = (unsigned)(LBT_ENTMASK & f.e[0]);
+               if ( !(LBT_MATCH & f.e[0]) && ! prefix )
+                       goto done;
+       }
+
+       for ( ;; i=0 ) { /* blocks */
+               unsigned char *base = LBT_BASE( b );
+               Dict *d;
+               if ( ! b || b->lev )
+                       return -ERR_TRASH;
+               b->lck = LBT_EX; /* lock */
+               d = b->dct + i;
+               for ( ; i<b->ent; i++, d++ ) { /* entries */
+                       unsigned char *ent = base + LBT_POS( d ); /* start of key */
+                       unsigned short kln = d->kln;
+                       if ( memcmp( key->byt, ent, kln <= key->len ? kln : key->len )
+                               || (prefix ? kln < key->len : kln != key->len )
+                       )
+                               goto done;
+                       if ( rec ) { /* record term */
+                               Field *f = rec->field + rec->len; /* field to assign */
+                               /** append term to record, unless it's the same as previous */
+                               if ( rec->len ) {
+                                       if ( kln == f[-1].len && !memcmp( f[-1].val, ent, kln ) )
+                                               continue;
+                               } else if ( cont ) {
+                                       if ( kln == start.len && !memcmp( start.byt, ent, kln ) )
+                                               continue;
+                               }
+                               stk -= kln;
+                               if ( stk < (char*)(f+1) ) /* stack reached field dict */
+                                       goto done;
+                               memcpy( stk, ent, kln );
+                               f->tag = 0;
+                               f->val = stk;
+                               f->len = kln;
+                               rec->len++;
+                               continue;
+                       }
+                       { /* more locals */
+                       LdbP merge[1024];
+                       unsigned short vln, k, m=0;
+                       unsigned char *c;
+                       int f = post->fil-1; /* highest pos to consider in given postings */
+                       if ( !(vln = LBT_VLN( d )) )
+                               continue;
+                       ent += kln; /* set to start of values */
+                       c = ent + 8*(vln-1); /* last value */
+                       /* collect postings */
+                       for ( k=vln; k--; c-=8 ) {
+                               /* loop backwards (for the fun of it) postings in segment */
+                               int prow, ptag, ppos;
+                               LdbP e; /* the entry */
+                               LdbP samerow; /* highest possible entry w/ same row as e */
+#if defined( LDB_BIG_ENDIAN )
+                               /* the 8 bytes of a posting are BIG ENDIAN ! */
+                               memcpy(e.bytes,c,8);
+#else
+                               e.bytes[0] = c[7];      e.bytes[1] = c[6];
+                               e.bytes[2] = c[5];      e.bytes[3] = c[4];
+                               e.bytes[4] = c[3];      e.bytes[5] = c[2];
+                               e.bytes[6] = c[1];      e.bytes[7] = c[0];
+#endif
+                               prow = LDBP_ROW( &e );
+                               ptag = LDBP_TAG( &e );
+                               ppos = LDBP_POS( &e );
+                               LOG_DBG( LOG_VERBOSE, "post %d.%hd pos %06x", prow, ptag, ppos );
+                               if ( !prow )
+                                       continue;
+                               if ( (post->cut && prow >= post->cut)
+                                       || (post->tag && post->tag != ptag)
+                               )
+                                       continue;
+                               if ( prow < post->skp ) /* quickly bail out on skip condition */
+                                       break;
+                               LDBP_SETROWTOP( &samerow, &e ); /* for mfn comparison */
+                               /* sweep down to postings for the same row as e ... */
+                               while ( f >= 0 && LDBP_GT( p+f, &samerow ) )
+                                       f--;
+                               if ( LDB_AND & post->mode ) {
+                                       int l;
+                                       /* loop postings for same row, mark all (that are near enough) */
+                                       LDBP_SETROWBOT( &samerow, &e ); /* for mfn comparison */
+                                       /* NOTE: postings for row are GT than bottom even if marked */
+                                       for ( l = f; l>=0 && LDBP_GT( p+l, &samerow ); l-- ) {
+                                               if ( post->near ) {
+                                                       int dist;
+                                                       if ( ptag != LDBP_TAG( p+l ) ) continue;
+                                                       if ( LDB_NEAR_G != post->near ) {
+                                                               dist = LDBP_POS( p+l ) - LDBP_POS( &e );
+                                                               if ( dist < 0 ) dist = -dist;
+                                                               if ( 0 < post->near
+                                                                       ? post->near < dist
+                                                                       : -post->near != dist /* exact $$$$ */
+                                                               ) continue;
+                                                       }
+                                               }
+                                               LDBP_SETMARK( p+l );
+                                       }
+                               } else {        /* OR mode */
+                                       int add;
+                                       if ( ! post->near ) /* add if row not found: ignore details */
+                                               add = 0 > f || prow > LDBP_ROW( p+f );
+                                       else {  /* add if no exact match */
+                                               int l;
+                                               /* NOTE: we don't use mark bit in OR mode, do we ? */
+                                               for ( l = f; l>=0 && LDBP_GT( p+l, &e ); l-- )
+                                                       ;
+                                               add = 0 > l || LDBP_GT( &e, p+l );
+                                       }
+                                       if ( add )
+                                               merge[ m++ ] = e;
+                               }
+                       }       /* for postings in segment */
+                       if ( m ) { /* merge in the merge buffer */
+                               LdbP *mm = merge;
+                               for ( k = post->fil += m; m && k--; ) {
+                                       LdbP src;
+                                       if ( k < m || LDBP_GT( mm, &p[k-m] ) ) {
+                                               src = *mm++;
+                                               m--;
+                                               LOG_DBG( LOG_DEBUG, "merging %d at %d", LDBP_ROW(&src), k );
+                                       } else
+                                               src = p[k-m];
+                                       if ( k < post->len )
+                                               p[k] = src;
+                                       else { /* set cut */
+                                               int row = LDBP_ROW( &src );
+                                               if ( row < post->cut || !post->cut )
+                                                       post->cut = row;
+                                       }
+                               }
+                               if ( post->fil > post->len )
+                                       post->fil = post->len;
+                               if ( post->cut ) /* postings for cut row are unreliable */
+                                       while ( post->fil && post->cut <= LDBP_ROW(p+post->fil-1) )
+                                               post->fil--;
+                       } /* if ( m ) */
+                       } /* more locals */
+               }       /* for entries in block */
+               ULK( b );
+               if ( ! b->nxt || !(b = getblk(bt, b->nxt, 0, LBT_RD)) )
+                       break;
+       }
+done:
+       if ( rec )
+               ret = rec->len;
+ouch:
+       if ( b )
+               ULK( b );
+       return ret;
+}      /* lbt_search */
+
+
+int lbt_init ( Idx *bt )
+{
+       int ret = 0, blkbits, i;
+       Block *l;
+       int size = lio_size( bt->fd );
+
+       bt->root = 0;
+       bt->hash = 0;
+       bt->mem = 0;
+       for ( i=LBT_LRULEV; i--; )
+               bt->lru[i] = bt->mru[i] = 0;
+
+       if ( 0x3ff & size ) /* not on 1K boundary */
+               return log_msg( LOG_ERROR, "bt has bad size %dK + %d",
+                       size >> 10, (int)size & 0x3ff );
+       if ( size ) { /* read root header */
+               Block b;
+               unsigned want = sizeof(b) - LBT_BOFF;
+               int got = lio_pread( &bt->fd, LBT_BASE(&b), want, 0 );
+               if ( want != (unsigned)got )
+                       return log_msg( LOG_SYSERR, "could not get root head" );
+               if ( b.num )
+                       return log_msg( ERR_TRASH, "root has num %d", b.num );
+               /*if ( LBT_ISIS != b.nxt ) log_msg( LOG_WARN, "magic is 0x%08x", b.nxt );*/
+               if ( bt->typ && bt->typ != b.typ )
+                       log_msg( LOG_WARN, "typ want 0x02%x got 0x02%x", bt->typ, b.typ );
+               bt->typ = b.typ;
+               if ( bt->key && bt->key != b.key )
+                       log_msg( LOG_WARN, "key want %d got %d", bt->key, b.key );
+               bt->key = b.key;
+               if ( bt->col && bt->col != b.col )
+                       log_msg( LOG_WARN, "col want %d got %d", bt->col, b.col );
+               bt->col = b.col;
+               bt->dpt = b.lev;
+               /*
+               b.ent
+               b.stk
+               */
+       } else { /* new index */
+               bt->dpt = 1;
+               if ( ! bt->key )
+                       bt->key = 30;
+       }
+
+       bt->vsz = 8+(0xf & bt->typ);
+       blkbits = 10 + (3 & (bt->typ >> 4));
+       bt->bsz = 1 << blkbits;
+       if ( size % bt->bsz )
+               return log_msg( ERR_TRASH, "size 0x%08x % blksize", size, bt->bsz );
+       bt->len = size >> blkbits;
+       if ( 2 > bt->len && !(LBT_WRITE & bt->flg) )
+               return log_msg( LOG_SYSERR, "attempt to read empty index" );
+
+       bt->hlen = 16 + bt->len / 32;   /* initial cache size */
+       if ( bt->hlen < 1000 ) bt->hlen = 1000;
+       if ( !(bt->root = addchunk( bt, bt->hlen )) )
+               return log_msg( LOG_SYSERR, "\twhen getting bt cache" );
+       if ( bt->hlen < 256 ) /* the hash */
+               bt->hlen = 256;
+       if ( !(bt->hash = (Block**)mAlloc( bt->hlen * sizeof(Block*) )) )
+               return log_msg( LOG_SYSERR, "\twhen getting bt hash" );
+       memset( bt->hash, 0, bt->hlen * sizeof(Block*) );
+
+       if ( size && readblk( bt->root, bt, 0 ) )
+               return log_msg( ERR_TRASH, "could not read root" );
+       if ( ! bt->len ) {
+               /* init: link root to leaf */
+               SETINT( LBT_BASE(bt->root)+(bt->root->stk -= 4), 1 );
+               LBT_PVK( bt->root->dct, bt->root->stk, 0, 0 );
+               bt->root->ent = 1;
+               bt->len = 1;
+       }
+       if ( !(l = getblk( bt, 1, 0, LBT_EX )) )
+               return log_msg( ERR_TRASH, "could not read 1st leaf" );
+       if ( ! size ) {
+               log_msg( LOG_INFO, "creating index bsz %u ksz %u vsz %u",
+                       bt->bsz, bt->key, bt->vsz );
+               bt->root->lev = 1;
+               if ( WRULK( bt->root, bt ) || WRULK( l, bt ) )
+                       return log_msg( LOG_SYSERR, "\twhen writing base" );
+       }
+
+       return ret;
+}      /* lbt_init */
+
+
+void lbt_close ( Idx *bt )
+{
+       Chunk *c, *n = bt->mem;
+       while ( (c = n) ) {
+               n = c->nxt;
+               mFree( c );
+       }
+       mFree( bt->hash );
+       lio_close( &bt->fd, LIO_INOUT );
+       memset( bt, 0, sizeof(*bt) );
+}      /* lbt_close */
+
+
+/* ************************************************************
+       public functions
+*/
+
+
+void cXMkVal ( Idx *bt, Val *val, Hit *hit )
+{
+       if ( hit ) {
+               val->len = bt->vsz;
+               mkval( val, hit );
+       } else
+               val->len = 0;
+}
+
+int cXAdd ( Idx *bt, Key *key, Hit *hit )
+{
+       if ( bt->bat ) {
+               if ( ! key )
+                       key = &bt->bat->key;
+               cXMkVal( bt, &key->val, hit );
+               return lbt_batchval( bt, key );
+       } else if ( !key )
+               return 0;
+       else if ( hit ) {
+               cXMkVal( bt, &key->val, hit );
+               if ( hit->dbn == 0xffff )
+                       return lbt_del( bt, key );
+       }
+       return lbt_add( bt, key );
+}      /* cXAdd */