--- /dev/null
+/*
+ openisis - an open implementation of the CDS/ISIS database
+ Version 0.8.x (patchlevel see file Version)
+ Copyright (C) 2001-2003 by Erik Grziwotz, erik@openisis.org
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+ see README for more information
+EOH */
+
+/*
+ $Id: lbt.c,v 1.35 2003/05/27 11:03:30 kripke Exp $
+ the btree.
+*/
+
+#include <stdlib.h>
+#include <string.h>
+
+/* special
+#if defined( __GNUC__ ) && defined ( alloca )
+#include <alloca.h>
+#endif
+*/
+#if defined( sparc ) || defined( __ppc__ )
+# define LDB_BIG_ENDIAN
+#endif
+
+#include "lio.h"
+#include "luti.h"
+#include "lbt.h"
+
+
+/*
+probably all relevant cases can be handled by the more
+efficient approach of storing recoded key a la strxfrm
+*/
+#define LBT_USECMP 0
+
+
+/* ************************************************************
+ private types
+*/
+
+enum {
+ /* little endian magic text */
+ LBT_OPEN = 'O' | 'P'<<8 | 'E'<<16 | 'N'<<24,
+ LBT_ISIS = 'I' | 'S'<<8 | 'I'<<16 | 'S'<<24
+};
+
+
+enum {
+ LBT_LEVELS = 16, /* max depth */
+ LBT_POSBITS = 13, /* bits for position within block */
+ LBT_POSMASK = 0x1fff,
+ LBT_POSMAX = LBT_POSMASK /* obviously :) */
+};
+
+typedef struct Dict { /* dictionary of block entries */
+ unsigned short pos; /* 13 bits position of entry, 2 bits val for leaves */
+ unsigned char vln; /* length of entry's value */
+ /* for inner nodes, this is the length of qualifying
+ second key segment (either 0 or bt->vsz <= 23).
+ for leaves, this, together with higher bits of pos,
+ is the number of values of bt->vsz each.
+ */
+ unsigned char kln; /* length of entry's key */
+/*
+ the following access macros are needed for leaves only
+*/
+#define LBT_POS( d ) (LBT_POSMASK & (d)->pos)
+#define LBT_VLN( d ) ((d)->vln | (~LBT_POSMASK & (d)->pos)>>(LBT_POSBITS-8))
+#define LBT_POSADD( d, a ) do { \
+ (d)->pos = (~LBT_POSMASK & (d)->pos) | (LBT_POSMASK & ((d)->pos + (a))); \
+ } while (0)
+#define LBT_VLNADD( d, a ) do { \
+ unsigned _vln = LBT_VLN( d ) + (a); \
+ (d)->pos = (LBT_POSMASK & (d)->pos) \
+ | (~LBT_POSMASK & ((_vln)<<(LBT_POSBITS-8))); \
+ (d)->vln = (unsigned char)(_vln); \
+ } while (0)
+#define LBT_PV( d, p, v ) do { \
+ (d)->pos = (LBT_POSMASK & (p)) | (~LBT_POSMASK & ((v)<<(LBT_POSBITS-8))); \
+ (d)->vln = (unsigned char)(v); \
+ } while (0)
+#define LBT_PVK( d, p, v, k ) do { \
+ LBT_PV( d, p, v ); \
+ (d)->kln = (unsigned char)(k); \
+ } while (0)
+} Dict;
+
+
+/* guard space around block */
+#ifndef NDEBUG
+# define LBT_GUARD 64
+/* template for guard space -- blank or pattern */
+static unsigned char guard[LBT_GUARD];
+#else
+# define LBT_GUARD 0
+#endif
+
+
+/*
+ TODO: check the possible performance gain in findkey
+ from separating disk block and administrative data
+ and aligning disk blocks on PAGE boundaries in memory
+*/
+typedef struct Block {
+#if LBT_GUARD
+ unsigned char grd[LBT_GUARD];
+#endif
+ unsigned char ird; /* is reading */
+ unsigned char wrd; /* #threads waiting for read */
+ unsigned char lck; /* is write locked */
+ unsigned char wlk; /* #threads waiting for write lock */
+#define LBT_BUSY( b ) (*(int*)&(b)->ird) /* any of 1st 4 bytes != 0 */
+ struct Block *nhs; /* next in hash chain */
+ struct Block *nru; /* next (more) recently used */
+ /* TODO: possibly double-link lru list, so we can unlink on every access ? */
+ /* the disk block -- from here on bsz bytes are on disk */
+#define LBT_BOFF ((char *)&((Block*)0)->num - (char*)0)
+#define LBT_BASE( b ) ((unsigned char *)&(b)->num)
+ unsigned num; /* number of this block, 0 is root */
+ unsigned nxt; /* right sibling, "OPEN" or "ISIS" for root */
+ /* typ,key,col are redundant in non-root blocks, maybe abused ... */
+ unsigned char typ; /* type: bsz, vsz, flags */
+ unsigned char key; /* max key length */
+ unsigned char col; /* collation */
+ unsigned char lev; /* level over bottom, 0 for leaves */
+ unsigned short ent; /* 10 bits # entries */
+#define LBT_ENT( b ) (b)->ent /* currently other bits arent abused */
+ unsigned short stk; /* 13 bits offset of keyspace stack */
+/* offset of dict RELATIVE to base */
+#define LBT_DOFF ((char *)&((Block*)0)->dct - (char*)&((Block*)0)->num)
+ Dict dct[1];
+} Block;
+
+/*
+ LAYOUT:
+ dct is an array of dictionary entries in increasing key order
+ stk is growing downwards from block end towards dict
+ position of entry i is b->stk <= LBT_POS(b->dct+i) < bt->bsz
+ decreasing on i (i.e. stack space is in reverse ordering)
+
+ at LBT_POS(b->dct+i), there are b->dct[i].kln bytes key
+ for a leaf entry, the key is followed by
+ LBT_VLN(b->dct+i) (between 0..1023 incl.) values
+ of bt->vsz (between 8..23 incl) each
+ for an inner entry, the key is followed by
+ 4 bytes block number and optionally bt->vsz qualifying value
+
+ SIZES:
+ block size >= 1024
+ block base overhead LBT_DOFF is 16 bytes
+ avail size >= 1008
+ Dict entry is 4 bytes
+ max. used stack space per entry is:
+ inner node: kln+4+bt->vsz <= 255+4+23 = 282
+ new leaf entry (one value): kln+bt->vsz <= 278
+ max total space for a new entry incl. dict is 286 < avail size/3
+*/
+
+typedef struct Chunk {
+ struct Chunk *nxt;
+} Chunk;
+
+
+enum {
+ /* number of levels for lru list */
+ LBT_LRULEV = sizeof(((Idx*)0)->lru)/sizeof(((Idx*)0)->lru[0]),
+ LBT_ENTBITS = 10, /* bits for # enries */
+ LBT_ENTMASK = 0x03ff,
+ LBT_ENTMAX = LBT_ENTMASK,
+ LBT_MATCH = 0x8000, /* entry matched */
+ LBT_ENTSIZE = 8 /* minimum cost of entry: 4 byte dict, 4 byte val */
+};
+
+
+typedef struct Batch {
+ Block *cur;
+ unsigned got; /* current for this key */
+ unsigned tot; /* total for this key */
+ unsigned let;
+ /* stats */
+ unsigned keys;
+ unsigned keyl;
+ unsigned maxl;
+ unsigned vals;
+ unsigned maxv;
+ unsigned span;
+ unsigned free;
+ /* buffer */
+ Key key;
+ Val val[LBT_ENTMAX];
+} Batch;
+
+
+typedef enum {
+ LBT_RD, /* read -- no lock */
+ LBT_WR, /* write */
+ LBT_EX /* exclusive */
+} lbt_get;
+
+
+typedef struct { /* structure filled by findkey */
+ Idx *bt; /* the tree */
+ Key *key; /* the key searched */
+ Block *b[LBT_LEVELS]; /* path to leaf, b[0] is leaf */
+ unsigned n[LBT_LEVELS]; /* block numbers of path */
+ unsigned short e[LBT_LEVELS]; /* entries in path */
+} FindKey;
+
+
+typedef struct {
+ unsigned get;
+ unsigned miss;
+ unsigned cmp;
+ unsigned key;
+} Stat;
+
+/* buffer to print a value to.
+ lowest 11 bytes are printed as 4 shorts + one int < 4*5+10 = 30 digits+5sep
+ higher 12 bytes are printed hex = 24 bytes.
+*/
+typedef char PrVal[64];
+
+
+/* ************************************************************
+ private data
+*/
+
+/*
+ stat access is not guarded by memory barrier.
+ may be wrong when running on multiple processors
+*/
+static Stat stat;
+
+
+/* ************************************************************
+ private functions
+*/
+
+#if defined( sparc )
+static unsigned GETINT ( void *m ) /* bus error safe version of *(int*) */
+{
+ unsigned l;
+ memcpy( &l, m, 4 );
+ return l;
+}
+static void SETINT ( void *m, unsigned i )
+{
+ memcpy( m, &i, 4 );
+}
+#else
+#define GETINT( m ) (*(unsigned*)(m))
+#define SETINT( m, i ) (*(unsigned*)(m) = (i))
+#endif
+
+static void mkval ( Val *v, Hit *ifp )
+{
+ unsigned char *c = v->byt;
+ switch ( v->len ) {
+ default: memset( c, 0, v->len-11 ); c += v->len-11;
+ case 11: *c++ = (unsigned char) (ifp->dbn >> 8);
+ case 10: *c++ = (unsigned char) ifp->dbn;
+ case 9: *c++ = (unsigned char) (ifp->mfn >> 24);
+ case 8: *c++ = (unsigned char) (ifp->mfn >> 16);
+ case 7: *c++ = (unsigned char) (ifp->mfn >> 8);
+ case 6: *c++ = (unsigned char) ifp->mfn;
+ case 5: *c++ = (unsigned char) (ifp->tag >> 8);
+ case 4: *c++ = (unsigned char) ifp->tag;
+ case 3: *c++ = (unsigned char) ifp->occ;
+ case 2: *c++ = (unsigned char) (ifp->pos >> 8);
+ case 1: *c++ = (unsigned char) ifp->pos;
+ case 0: ;
+ }
+} /* mkval */
+
+
+static Hit *rdval ( Hit *ifp, unsigned char *c, unsigned vsz )
+{
+ ifp->mfn = ifp->tag = ifp->occ = ifp->pos = ifp->dbn = 0;
+ switch ( vsz ) {
+ default: c += vsz-11;
+ case 11: ifp->dbn = (unsigned short)*c++ << 8;
+ case 10: ifp->dbn |= *c++;
+ case 9: ifp->mfn = (unsigned int)*c++ << 24;
+ case 8: ifp->mfn |= (unsigned int)*c++ << 16;
+ case 7: ifp->mfn |= (unsigned int)*c++ << 8;
+ case 6: ifp->mfn |= *c++;
+ case 5: ifp->tag = (unsigned short)*c++ << 8;
+ case 4: ifp->tag |= *c++;
+ case 3: ifp->occ = *c++;
+ case 2: ifp->pos = (unsigned short)*c++ << 8;
+ case 1: ifp->pos |= *c++;
+ case 0: ;
+ }
+ return ifp;
+} /* rdval */
+
+
+static char *prval ( PrVal buf, unsigned char *c, unsigned vsz )
+{
+ char *d = buf;
+ Hit h;
+ if ( ! vsz ) {
+ buf[0] = '#';
+ buf[1] = 0;
+ return buf;
+ }
+ if ( 23 < vsz )
+ vsz = 23;
+ if ( 11 < vsz ) {
+ while ( 11 < vsz-- ) {
+ *d++ = luti_hex[ *c >> 4 ];
+ *d++ = luti_hex[ *c++ & 0xf ];
+ }
+ *d++ = '.';
+ }
+ rdval( &h, c, vsz );
+ d += u2a( d, h.dbn ); *d++ = '.';
+ d += u2a( d, h.mfn ); *d++ = '.';
+ d += u2a( d, h.tag ); *d++ = '.';
+ d += u2a( d, h.occ ); *d++ = '.';
+ d += u2a( d, h.pos ); *d++ = '.';
+ *d = 0;
+ return buf;
+} /* prval */
+
+/* for PRVAL f->key->val */
+#define PRVAL( buf, v ) prval( buf, v.byt, v.len )
+
+
+/* ************************************************************
+ block I/O & cache mgmt
+*/
+
+static void iniblk ( Block *b, Idx *bt, unsigned num, unsigned char lev )
+{
+ memset( b, 0, LBT_BOFF + bt->bsz + LBT_GUARD );
+ b->num = num;
+ b->typ = bt->typ;
+ b->key = bt->key;
+ b->col = bt->col;
+ b->lev = lev;
+ b->stk = bt->bsz;
+} /* iniblk */
+
+
+static int chkblk ( Block *b, Idx *bt )
+{
+#if 0
+ (void)b; (void)bt;
+#else
+ int i;
+ unsigned short eod = LBT_DOFF + b->ent*sizeof(Dict), siz, pos, vln, key;
+ unsigned end = bt->bsz, tot = 0;
+ unsigned char *base = LBT_BASE(b);
+#if LBT_GUARD
+ if ( memcmp( guard, b->grd, LBT_GUARD ) )
+ return log_msg( ERR_TRASH, "OOPS! initial guard frobbed on blk %d", b->num );
+ if ( memcmp( guard, LBT_BASE(b)+bt->bsz, LBT_GUARD ) )
+ return log_msg( ERR_TRASH, "OOPS! initial guard frobbed on blk %d", b->num );
+#endif
+ if ( b->num > bt->len || (b->num && b->nxt > bt->len) )
+ return log_msg( ERR_TRASH,
+ "bad num %d / nxt %d on lev %d", b->num, b->nxt, b->lev );
+ if ( b->typ != bt->typ || b->key != bt->key || b->col != bt->col )
+ return log_msg( ERR_TRASH,
+ "bad sig 0x%02x,%d,%d in block %d want 0x%02x,%d,%d",
+ b->typ, b->key, b->col, b->num, bt->typ, bt->key, bt->col );
+ if ( b->ent > bt->bsz / LBT_ENTSIZE || b->stk > bt->bsz || b->stk < eod )
+ return log_msg( ERR_TRASH,
+ "bad ent/stk %d,%d in block %d eod %d sz %d",
+ b->ent, b->stk, b->num, eod, bt->bsz );
+ for ( i=0; i<b->ent; i++ ) {
+ Dict *d = &b->dct[i];
+ pos = LBT_POS( d );
+ vln = LBT_VLN( d );
+ key = d->kln;
+ siz = key + (b->lev ? vln+4u : vln * bt->vsz);
+ if ( pos < b->stk || end != (unsigned)pos + siz )
+ return log_msg( ERR_TRASH,
+ "bad pos/vln/key %d/%d/%d in entry %d of block %d"
+ " stack %d sz %d end %d expected %d",
+ pos, vln, key, i, b->num, b->stk, siz, pos+siz, end );
+ if ( b->lev ) {
+ unsigned lnk;
+ if ( vln && vln != +bt->vsz )
+ return log_msg( ERR_TRASH,
+ "bad vln %d in entry %d of block %d vsz %d",
+ vln, i, b->num, bt->vsz );
+ lnk = GETINT( base + pos + key + vln );
+ if ( lnk >= bt->len )
+ return log_msg( ERR_TRASH,
+ "bad link %u in entry %d of block %d",
+ lnk, i, b->num );
+ }
+#if 1 && ! defined( NDEBUG )
+ if ( i ) {
+ unsigned short op = LBT_POS( d-1 ), okl = d[-1].kln;
+ int mc = memcmp( base+pos, base+op, key<okl ? key : okl ), j;
+ if ( 0 < mc || (!mc && 0 < (mc = (int)key - (int)okl)) )
+ goto ok;
+ j = 0;
+ if ( !mc && b->lev && vln ) { /* compare values */
+ unsigned ovln = d[-1].vln;
+ if ( ! ovln || 0 < (j = memcmp( base+pos+key, base+op+okl, vln )) )
+ goto ok;
+ }
+ return log_msg( ERR_TRASH,
+ "bad key order entry %d of block %d lev %d"
+ " mc %d j %d key %d okl %d vln %d",
+ i, b->num, b->lev, mc, j, key, okl, vln
+ );
+ }
+ ok:
+#endif
+ tot += siz;
+ end = pos;
+ }
+ if ( tot > bt->bsz - b->stk )
+ return log_msg( ERR_TRASH,
+ "bad total entry size %d in block %d stack %d",
+ tot, b->num, b->stk );
+#ifndef NDEBUG
+ if ( LOG_DO( LOG_TRACE ) ) {
+ log_msg( LOG_TRACE,
+ "block %d lev %d sig 0x%02x,%d,%d ent %d eod %d stk %d",
+ b->num, b->lev, b->typ, b->key, b->col, b->ent, eod, b->stk
+ );
+ for ( i=0; i<2; i++ ) {
+ int j = i ? b->ent-1 : 0;
+ Dict *d = &b->dct[j];
+ if ( i >= b->ent ) /* if b->ent is 0 or 1, log no or one key */
+ break;
+ pos = LBT_POS( d );
+ vln = LBT_VLN( d );
+ key = d->kln;
+ end = pos + b->lev ? vln+4u : vln * bt->vsz;
+ /* start of value */
+ if ( b->lev )
+ log_msg( LOG_TRACE, "key %d '%.*s'/%d -> %d",
+ j, key, base+pos, vln, GETINT(base+pos+key+vln) );
+ else {
+ Hit h;
+ rdval( &h, base+pos+key, bt->vsz );
+ log_msg( LOG_TRACE, "key %d '%.*s'[%d] = %d[%d,%d,%d]",
+ j, key, base+pos, vln, h.mfn, h.tag, h.occ, h.pos );
+ }
+ }
+ }
+#endif
+#endif
+ return 0;
+} /* chkblk */
+
+
+static Block *addchunk ( Idx *bt, unsigned cnt )
+{
+ unsigned ibsz = LBT_BOFF + bt->bsz + LBT_GUARD; /* internal block size */
+ /* increase cnt so it's > 0 and add one extra to return */
+ char *mem = (char *)mAlloc( sizeof(Chunk) + (1 + ++cnt)*ibsz );
+ Chunk *chk = (Chunk *)mem;
+ if ( ! mem )
+ return 0;
+ /* chain the chunk */
+ chk->nxt = bt->mem;
+ bt->mem = chk;
+ mem += sizeof(Chunk);
+ bt->clen += cnt;
+ /* add cnt blocks to level 0 lru list */
+ if ( ! bt->mru[0] ) /* level 0 cache is empty */
+ bt->mru[0] = (Block *)mem;
+ for ( ; cnt--; mem += ibsz ) {
+ Block *b = (Block *)mem;
+ iniblk( b, bt, 0 , 0 ); /* initialise empty block */
+ b->nru = bt->lru[0];
+ bt->lru[0] = b;
+ }
+ iniblk( (Block *)mem, bt, 0 , 0 );
+ return (Block *)mem;
+} /* addchunk */
+
+
+static int readblk ( Block *b, Idx *bt, unsigned num )
+{
+ int got = lio_pread( &bt->fd, LBT_BASE(b), bt->bsz, num*bt->bsz );
+ log_msg( LOG_VERBOSE, "reading block %u got %u", num, got );
+ if ( bt->bsz != (unsigned)got )
+ return log_msg( LOG_SYSERR, "\twhen reading bt block %d", num );
+ if ( b->num != num )
+ return log_msg( ERR_TRASH, "bad num %d in block %d", b->num, num );
+ return chkblk( b, bt );
+} /* readblk */
+
+
+static void unlock ( Block *b )
+{
+ b->lck = 0;
+ if ( b->wlk )
+ (void)LIO_WAKE( b->num );
+}
+
+#define ULK( b ) do { \
+ if ( (b)->lck ) { (b)->lck = 0; if ( b->wlk ) unlock( b ); } \
+ } while (0)
+
+
+static int writeblk ( Block *b, Idx *bt, int ulk )
+{
+ int got;
+ if ( chkblk( b, bt ) ) /* don't write no shize */
+ return log_msg( ERR_TRASH, "\tbefore writing bt block %d", b->num );
+ got = lio_pwrite( &bt->fd, LBT_BASE(b), bt->bsz, b->num*bt->bsz );
+ log_msg( LOG_VERBOSE, "writing block %u got %u", b->num, got );
+ if ( ulk )
+ ULK( b );
+ if ( bt->bsz != (unsigned)got )
+ return log_msg( LOG_SYSERR, "\twhen writing bt block %d", b->num );
+ return 0;
+} /* writeblk */
+
+#define WRULK( b, bt ) writeblk( b, bt, 1 )
+
+
+#define NEWBLK( bt, lev, lock ) getblk( bt, bt->len, lev, lock )
+/**
+ get block #num.
+ 1) locate block #num in cache.
+ 2) if it is found:
+ if it is READING, or we want to lock and it is locked,
+ incr # waiters, wait for it.
+ 3) if it is not found:
+ locate a free cache entry, extending the cache if necessary,
+ set the num, set the READING flag, go read it.
+ after reading, if there are waiters, wake them up.
+ since we are the first to get a hold on the block,
+ we may write lock it, if we want.
+ 4) return the block.
+*/
+static Block *getblk ( Idx *bt, unsigned num,
+ unsigned char lev, lbt_get lock )
+{
+ Block *b;
+ stat.get++; /* dirty */
+ if ( !(stat.get & 0x3fff) ) { /* every 16K */
+ static unsigned lmiss;
+ log_msg( LOG_INFO,
+ "searched %i keys in %u blks cache %d %d%%"
+ " depth %u cmp/get/miss %u/%u/%u %d%% (%d %d%%)",
+ stat.key, bt->len, bt->clen, bt->clen*100 / bt->len,
+ bt->root->lev, stat.cmp, stat.get,
+ stat.miss, stat.miss*100 / stat.get,
+ (stat.miss - lmiss), (stat.miss - lmiss)*100 / 0x4000
+ );
+ lmiss = stat.miss;
+ }
+startover: /* only used on absurd congestion */
+ b = 0;
+ LOG_DBG( LOG_DEBUG, "get bt block %u %u %d", num, lev, lock );
+ if ( ! num )
+ b = bt->root;
+ else if ( num < bt->len ) {
+ b = bt->hash[ num % bt->hlen ];
+ while ( b && num != b->num )
+ b = b->nhs;
+ }
+ if ( b )
+ while ( b->ird || (lock && b->lck) ) {
+ int wrd = b->ird;
+ int wlk = lock && b->lck;
+ LOG_DBG( LOG_VERBOSE, "lck on block %u %u r%d l%d",
+ num, lev, b->ird, b->lck );
+ if ( LBT_EX <= lock ) {
+ log_msg( LOG_ERROR, "concurrent mod on EX lock" );
+ return 0;
+ }
+ if ( ! lio_lock ) {
+ log_msg( LOG_FATAL, "bad MT environment: concurrent mod, no lock" );
+ return 0;
+ }
+ if ( wrd && 0xff != b->wrd )
+ b->wrd++;
+ if ( wlk && 0xff != b->wlk )
+ b->wlk++;
+ (void)LIO_WAIT( num ); /* zzzzz .... sleep tight */
+ /* some hours later */
+ if ( num != b->num ) {
+ log_msg( LOG_WARN, "heavy congestion: block %d has gone", num );
+ goto startover;
+ }
+ if ( wrd && b->wrd )
+ b->wrd--;
+ if ( wlk && b->wlk )
+ b->wlk--;
+ }
+ else { /* not found or new block */
+ /* find free block */
+ int i = 0, llev, err = 0;
+ stat.miss++; /* dirty */
+ LOG_DBG( LOG_DEBUG, "bt block %u %u not found", num, lev );
+ for ( ; i < (int)LBT_LRULEV; i++ ) {
+ /* check lru list for level i */
+ Block **bp = &bt->lru[i];
+ while ( (b = *bp) && LBT_BUSY(b) )
+ bp = &b->nru;
+ if ( ! b ) /* try next level */
+ continue;
+ *bp = b->nru; /* dequeue */
+ if ( bt->mru[i] == b ) /* was the tail */
+ bt->mru[i] = 0;
+ break;
+ }
+ if ( ! b ) {
+ log_msg( LOG_WARN, "no free blocks in cache" );
+ if ( !(b = addchunk( bt, 64 )) )
+ return 0;
+ /* TODO: should we also extend the hash ? */
+ } else if ( b->num ) { /* got block -- should be hashed somewhere */
+ Block **bp = &bt->hash[ b->num % bt->hlen ];
+ while ( *bp && *bp != b )
+ bp = &(*bp)->nhs;
+ if ( *bp == b )
+ *bp = b->nhs; /* dehash */
+ /* else ... hmmm should have found it */
+ }
+ assert( ! LBT_BUSY( b ) );
+ iniblk( b, bt, num, lev );
+ /* hash it, so waiters can find it */
+ b->nhs = bt->hash[ b->num % bt->hlen ];
+ bt->hash[ b->num % bt->hlen ] = b;
+ if ( num == bt->len ) { /* extend */
+ b->lev = lev;
+ bt->len++;
+ /* TODO: should be configurable */
+ if ( bt->len > bt->clen << 5 && bt->clen < 0x4000 )
+ addchunk( bt, bt->clen / 4 );
+ } else {
+ b->ird = 1;
+ (void)LIO_RELE();
+ err = readblk( b, bt, num );
+ (void)LIO_LOCK();
+ b->ird = 0;
+ if ( ! err && lev != b->lev )
+ err = log_msg( ERR_TRASH, "bad lev %d in block %d want %d",
+ b->lev, b->num, lev );
+ if ( err ) {
+ Block **bp = &bt->hash[ num % bt->hlen ];
+ while ( *bp && *bp != b )
+ bp = &(*bp)->nhs;
+ if ( *bp == b )
+ *bp = b->nhs; /* dehash */
+ b->lev = 0; /* add to level 0 lru list */
+ b->num = 0; /* tell others that it isn't the expected block */
+ }
+ }
+ /* add to it's levels lru list */
+ llev = LBT_LRULEV > b->lev ? b->lev : LBT_LRULEV-1;
+ if ( bt->mru[ llev ] )
+ bt->mru[ llev ]->nru = b;
+ else
+ bt->lru[ llev ] = b;
+ (bt->mru[ llev ] = b)->nru = 0;
+ if ( b->wrd )
+ (void)LIO_WAKE( num );
+ if ( err ) {
+ b->wrd = b->wlk = 0; /* mark block unused */
+ return 0;
+ }
+ } /* got free block */
+ if ( LBT_WR == lock ) {
+ if ( b->lck ) {
+ log_msg( LOG_FATAL, "bt consi: unexpected lock on %u", num );
+ return 0;
+ }
+ b->lck = 1;
+ }
+ return b;
+} /* getblk */
+
+
+/** binsearch position for key.
+ position is either the index of the matching entry (0..b->ent-1)
+ or the index (0..b->ent) where key should be inserted.
+ @return position for key (0..b->ent), |ifyed with LBT_MATCH, if found
+*/
+static unsigned short keypos (
+#if LBT_USECMP
+ Idx *bt,
+#define KEYPOS( bt, b, key ) keypos( bt, b, key )
+#else
+#define KEYPOS( bt, b, key ) keypos( b, key )
+#endif
+ Block *b, Key *key )
+{
+ unsigned char *base = LBT_BASE( b );
+ unsigned char *kbt = key->byt;
+ unsigned char kln = key->len;
+ unsigned short l = 0, r = b->ent;
+ Dict *d = b->dct;
+ /* binsearch invariant:
+ key is < all entries i >= r (we do know relation at r)
+ key is > all entries i < l (don't know relation at l)
+ */
+ while ( l < r ) { /* invariant: l <= keypos < r */
+ unsigned short i = (l + r) >> 1; /* l <= i < r */
+ unsigned short pos = LBT_POS(&d[i]);
+ unsigned char cln = d[i].kln;
+ int cmp =
+#if LBT_USECMP
+ bt->cmp ? bt->cmp( kbt, base+pos, kln<cln ? kln : cln ) :
+#endif
+ memcmp( kbt, base+pos, kln<cln ? kln : cln );
+ stat.cmp++; /* dirty */
+ if ( ! cmp && !(cmp = (int)kln - cln) ) {
+ /* match on main key -- what about the value ? */
+ if ( ! b->lev /* ignore on leaves -- no duplicate keys here */
+ || (!d[i].vln && !key->val.len) /* no value */
+ )
+ return LBT_MATCH | i;
+ /* so we're on an inner node */
+ if ( ! d[i].vln ) /* common case */
+ cmp = 1;
+ else if ( ! key->val.len )
+ cmp = -1;
+ else {
+ unsigned char vln = LBT_VLN(&d[i]);
+ int c = key->val.len < d[i].vln ? key->val.len : vln;
+ if ( !(cmp = memcmp( key->val.byt, base+pos+cln, c ))
+ && !(cmp = (int) key->val.len - vln)
+ )
+ return LBT_MATCH | i;
+ }
+ }
+ if ( cmp > 0 )
+ l = i+1;
+ else
+ r = i;
+ }
+ return l; /* == r */
+} /* keypos */
+
+
+/** find position for (leaf) val.
+ search v in array b of n values of size sz
+ @return position for value (0..n), |ifyed with LBT_MATCH, if found
+*/
+static unsigned short valpos ( unsigned char *b,
+ unsigned short n, unsigned char sz, unsigned char *v )
+{
+ unsigned short l = 0, r = n;
+ /* binsearch invariant:
+ v is < all values i >= r (we do know relation at r)
+ v is > all values i < l (don't know relation at l)
+ */
+ while ( l < r ) { /* invariant: l <= valpos < r */
+ unsigned short i = (l + r) >> 1; /* l <= i < r */
+ int cmp = memcmp( v, b+i*sz, sz );
+ if ( ! cmp )
+ return LBT_MATCH | i;
+ if ( cmp > 0 )
+ l = i+1;
+ else
+ r = i;
+ }
+ return l; /* == r */
+} /* valpos */
+
+
+#ifndef NDEBUG
+static void TRACEKEY ( unsigned lev, FindKey *f )
+{
+ int i;
+ unsigned n = 0;
+ Hit h;
+ PrVal pv;
+
+ if ( LOG_DONT( lev ) )
+ return;
+ log_msg( lev, "key '%.*s' vln %d val %s",
+ f->key->len, f->key->byt, f->key->val.len,
+ PRVAL( pv, f->key->val ) );
+ if ( f->key->val.len ) {
+ rdval( &h, f->key->val.byt, f->key->val.len );
+ log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d",
+ h.dbn, h.mfn, h.tag, h.occ, h.pos );
+ }
+ for ( i=LBT_LEVELS; i--; ) {
+ Block *b = f->b[i];
+ unsigned on = n, e;
+ Dict *d;
+ if ( ! b )
+ continue;
+ d = b->dct + (e = (LBT_ENTMASK & f->e[i]));
+ if ( i )
+ n = GETINT( LBT_BASE(b)+d->pos+d->kln+d->vln );
+ log_msg( lev, "lev %d n %d e 0x%x = %d %c '%.*s' val %d %s -> %d%s",
+ i, f->n[i], f->e[i], e, (LBT_MATCH & f->e[i]) ? 'M' : '-',
+ d->kln, LBT_BASE(b)+d->pos, d->vln,
+ prval( pv, LBT_BASE(b)+d->pos+d->kln, d->vln ),
+ n, on == b->num ? "" : " LY!"
+ );
+ if ( d->vln ) {
+ rdval( &h, LBT_BASE(b)+d->pos+d->kln, d->vln );
+ log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d",
+ h.dbn, h.mfn, h.tag, h.occ, h.pos );
+ }
+ if ( e ) {
+ d--;
+ log_msg( lev, "\tleft '%.*s' val %d %s",
+ d->kln, LBT_BASE(b)+d->pos, d->vln,
+ prval( pv, LBT_BASE(b)+d->pos+d->kln, d->vln )
+ );
+ if ( d->vln ) {
+ rdval( &h, LBT_BASE(b)+d->pos+d->kln, d->vln );
+ log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d",
+ h.dbn, h.mfn, h.tag, h.occ, h.pos );
+ }
+ }
+ }
+} /* TRACEKEY */
+#else
+#define TRACEKEY( l, f ) (void)(f)
+#endif
+
+
+static int findkey ( FindKey *f, lbt_get lock )
+{
+ int i;
+ Block *b = f->bt->root;
+ unsigned char *pkey = 0, pkln = 0, pvln = 0;
+ if ( b->lev >= LBT_LEVELS )
+ return log_msg( ERR_TRASH, "tree is too deep: %u", b->lev );
+ stat.key++;
+ for ( i=LBT_LEVELS; i--; ) { /* paranoia */
+ f->b[i] = 0;
+ f->n[i] = f->e[i] = 0;
+ }
+ for ( i=b->lev;; ) {
+ unsigned short e;
+ unsigned n;
+ Dict *d;
+#ifndef NDEBUG
+ chkblk( b, f->bt );
+#endif
+ f->n[i] = (f->b[i] = b)->num;
+ f->e[i] = e = KEYPOS( f->bt, b, f->key );
+ LOG_DBG( LOG_DEBUG, "keypos %x", e );
+#ifndef NDEBUG
+ if ( pkey && b->ent ) {
+ /* compare parent key against our first key
+ pkey must be equal for inner node and not greater for leave
+ */
+ int cmp; /* this - pkey >= 0 */
+ unsigned char *ent, ln;
+ d = b->dct;
+ ent = LBT_BASE(b)+LBT_POS(d);
+ ln = pkln < d->kln ? pkln : d->kln;
+ cmp = ln ? memcmp( ent, pkey, ln ) : 0;
+ if ( ! cmp )
+ cmp = (int)d->kln - (int)pkln;
+ if ( ! cmp ) {
+ if ( ! d->vln || ! pvln )
+ cmp = (int)d->vln - (int)pvln;
+ else
+ cmp = memcmp( ent+d->kln, pkey+pkln, f->bt->vsz );
+ }
+ if ( cmp && (i || 0 > cmp) ) {
+ Hit h;
+ PrVal pv;
+ TRACEKEY( ERR_TRASH, f );
+ if ( d->vln ) {
+ rdval( &h, ent+d->kln, d->vln );
+ log_msg( ERR_TRASH, "\tfirst val db %d mfn %d tag %d occ %d pos %d",
+ h.dbn, h.mfn, h.tag, h.occ, h.pos );
+ }
+ return log_msg( ERR_TRASH, "\tbad first '%.*s' val %d %s",
+ d->kln, ent, d->vln, prval( pv, ent+d->kln, d->vln ) );
+ }
+ }
+#endif
+ /*
+ here goes the Lehmann/Yao race detection:
+ If we ran on right boundary, follow right link
+ */
+ while ( !(LBT_MATCH & e) && b->ent == e && b->nxt && b->num ) {
+ Block *s;
+ unsigned se;
+ b->lck = LBT_WR; /* hold b while checking s */
+ s = getblk( f->bt, b->nxt, i, i ? LBT_RD : lock );
+ if ( i || ! lock )
+ ULK( b );
+ else
+ b->lck = lock;
+ se = KEYPOS( f->bt, s, f->key );
+ if ( !se ) { /* lower than first */
+ ULK( s );
+ break;
+ }
+ LOG_DBG( LOG_DEBUG, "LYao %d -> %d lev %d for '%.*s' got 0x%x",
+ b->num, s->num, i, f->key->len, f->key->byt, se );
+ /* trade b for s */
+ ULK( b );
+ f->n[i] = (f->b[i] = b = s)->num;
+ e = se;
+ }
+ f->e[i] = e;
+ if ( ! i )
+ return 0;
+ if ( !(LBT_MATCH & e) ) {
+ if ( e )
+ f->e[i] = --e;
+ else {
+ TRACEKEY( ERR_TRASH, f );
+ return log_msg( ERR_IDIOT,
+ "ran on left bound of block %u ent %u",
+ b->num, b->ent );
+ }
+ }
+ d = b->dct + (LBT_ENTMASK & e);
+ /* direct access ok in inner node */
+ pkey = LBT_BASE(b)+d->pos;
+ pkln = d->kln;
+ pvln = d->vln;
+ n = GETINT( pkey+pkln+pvln );
+ i--;
+ b = getblk( f->bt, n, i, i ? LBT_RD : lock );
+ }
+ return -1; /* go away */
+} /* findkey */
+
+
+static int mktree ( Idx *bt )
+{
+ Batch *ba = bt->bat;
+ unsigned char lev = 0; /* the level to be indexed */
+ unsigned left = 1; /* block on child level */
+ /* stats */
+ unsigned size = bt->len * bt->bsz;
+ unsigned sval = ba->vals * bt->vsz;
+ unsigned sove = bt->len * LBT_DOFF + sizeof(Dict)*(ba->keys+ba->span);
+ unsigned iblk = 0;
+ unsigned ikey = 0;
+ unsigned ikeyl = 0;
+ unsigned ipfx = 0;
+ unsigned ispn = 0;
+
+ log_msg( LOG_INFO,
+ "end batch:\n"
+ "\t#blks %u total len %u free %u (%.1f%%) overhead %u (%.1f%%)\n"
+ "\t#keys %u total len %u (%.1f%%) avg %.1f max %u\n"
+ "\t#vals %u total len %u (%.1f%%) avg %.1f max %u spans %u",
+ bt->len, size, ba->free, ba->free*100./size, sove, sove*100./size,
+ ba->keys, ba->keyl, ba->keyl*100./size, (float)ba->keyl/ba->keys, ba->maxl,
+ ba->vals, sval, sval*100./size, (float)ba->vals/ba->keys, ba->maxv, ba->span
+ );
+
+ /* build tree */
+ for (;;) { /* levels */
+ unsigned len = 0; /* current is n'th block on level */
+ Block *b = 0;
+ unsigned have = 0; /* room in current block b */
+ unsigned char*base = 0; /* of current block b */
+ Dict *d = 0; /* of current block b */
+ unsigned next = left; /* child */
+
+ left = bt->len;
+ /* get child c and create child pointer in block b */
+ while ( next ) {
+ unsigned char vln = 0;
+ unsigned char *key;
+ unsigned char kln;
+ unsigned need;
+ Block *c = getblk( bt, next, lev, LBT_EX );
+ if ( ! c )
+ return log_msg( ERR_NOMEM, "no child block" );
+ next = c->nxt;
+ if ( ! c->ent ) /* OOPS empty node */
+ continue;
+ key = LBT_BASE(c)+LBT_POS(c->dct);
+ kln = c->dct[0].kln;
+ if ( ! b ) /* the leftmost child on this level */
+ kln = 0; /* always 0 key on left */
+ else if ( lev )
+ vln = LBT_VLN(c->dct);
+ else if ( ba->key.len == kln /* compare key to last of prev. */
+ && !memcmp( ba->key.byt, key, kln ) /* which was saved in ba->key */
+ ) { /* key spans blocks */
+ LOG_DBG( LOG_DEBUG, "found span key '%.*s' child %u",
+ kln, key, c->num );
+ vln = bt->vsz; /* qualify with value */
+ } else {
+#if 1
+ unsigned char x = 0;
+ /* find prefix key */
+ if ( kln > ba->key.len )
+ kln = ba->key.len;
+ while ( x < kln && key[x] == ba->key.byt[x] )
+ x++;
+ if ( x < kln ) /* they differ at [x] */
+ kln = x+1;
+ else /* equal on the shorter length */
+ kln++;
+ if ( kln > c->dct[0].kln )
+ return log_msg( ERR_TRASH,
+ "bad key '%.*s' on level %u matching previous of length %u",
+ c->dct[0].kln, key, lev, ba->key.len );
+ LOG_DBG( LOG_DEBUG, "prefix on key '%.*s' child %u save %u",
+ kln, key, c->num, c->dct[0].kln - kln );
+ ipfx += c->dct[0].kln - kln;
+#endif
+ }
+ if ( vln )
+ ispn++;
+ ikeyl += kln;
+ need = sizeof(Dict) + kln + vln + 4;
+ if ( need > have ) {
+ if ( b ) {
+ log_msg( LOG_VERBOSE,
+ "level %u block %u is full %u ent stack %u",
+ lev, b->num, b->ent, b->stk );
+ b->nxt = bt->len; /* link */
+ b->lck = 0;
+ if ( WRULK( b, bt ) ) goto nowrite;
+ len++;
+ }
+ c->lck = LBT_EX; /* paranoia */
+ b = NEWBLK( bt, lev+1, LBT_EX );
+ c->lck = 0;
+ if ( ! b )
+ return log_msg( ERR_NOMEM, "no free block" );
+ if ( b->ent || b->stk != bt->bsz )
+ return log_msg( ERR_TRASH,
+ "new block %u is not empty: ent %u stk %u",
+ b->num, b->ent, b->stk );
+ base = LBT_BASE( b );
+ b->lck = LBT_EX; /* lock */
+ have = bt->bsz - LBT_DOFF; /* - ba->let; TODO: should we ??? */
+ d = b->dct;
+ len++;
+ iblk++;
+ }
+ SETINT( base+(b->stk -= 4), c->num );
+ d->kln = kln;
+ d->vln = vln;
+ if ( kln += vln ) /* key + consecutive qualifying value */
+ memcpy( base+(b->stk -= kln), key, kln );
+ d->pos = b->stk;
+ d++;
+ b->ent++;
+ ikey++;
+ have -= need;
+ /* save the last dance for me */
+ memcpy( ba->key.byt, LBT_BASE(c)+LBT_POS(&c->dct[c->ent-1]),
+ ba->key.len = c->dct[c->ent-1].kln );
+ }
+ if ( !len )
+ return log_msg( ERR_TRASH, "level %u was empty", lev );
+ /* close level */
+ log_msg( LOG_INFO, "closing level %u with %u blocks %u..%u",
+ lev, len, left, b->num );
+ b->lck = 0;
+ if ( 1 == len ) {
+ memcpy( bt->root, b, LBT_BOFF + bt->bsz );
+ b = bt->root;
+ b->num = 0;
+ }
+ if ( WRULK( b, bt ) ) goto nowrite;
+ lev++;
+ if ( 2 > len )
+ break;
+ }
+ log_msg( LOG_INFO,
+ "\tused %u inner blocks %u keys %u bytes avg %.1f\n"
+ "\t%u #spans (%.1f%%) %u pfx (- %.1f%%)",
+ iblk, ikey, ikeyl, (float)ikeyl/ikey,
+ ispn, ispn*100.*bt->vsz/ikeyl, ipfx, ipfx*100./(ipfx+ikeyl) );
+
+ mFree( bt->bat );
+ bt->bat = 0;
+ return 0;
+nowrite:
+ return log_msg( LOG_SYSERR, "\twhen writing batch block" );
+} /* mktree */
+
+
+
+/* ************************************************************
+ package data
+*/
+
+
+
+/* ************************************************************
+ package functions
+*/
+
+int lbt_batch ( Idx *bt, unsigned char pctfree )
+{
+ bt->bat = (Batch*)mAlloc( sizeof(Batch) );
+ if ( ! bt->bat )
+ return log_msg( ERR_NOMEM, "\twhen getting batch" );
+ memset( bt->bat, 0, sizeof(Batch) );
+ if ( pctfree > 50 )
+ pctfree = 50;
+ bt->bat->let = pctfree * bt->bsz / 100;
+ lio_trunc( &bt->fd, 0 );
+ iniblk( bt->root, bt, 0, 0 );
+ /* TODO: kill hash */
+ bt->len = 1;
+ bt->bat->cur = NEWBLK( bt, 0, LBT_EX );
+ return 0;
+} /* lbt_batch */
+
+
+int lbt_batchval ( Idx *bt, Key *key )
+{
+ int cmp = 0;
+ Batch *ba = bt->bat;
+ if ( key != &ba->key ) {
+ int l = key->len < ba->key.len ? key->len : ba->key.len; /* min */
+ if ( ! (cmp = memcmp(key->byt, ba->key.byt, l)) )
+ cmp = (int)key->len - (int)ba->key.len;
+ if ( 0 > cmp )
+ return log_msg( LOG_ERROR, "batch key '%.*s' < last '%.*s'",
+ key->len, key->byt, ba->key.len, ba->key.byt );
+ }
+ /* flush ? */
+ while ( ! key->val.len /* end */
+ || LBT_ENTMAX == ba->got /* full */
+ || cmp /* next key */
+ ) {
+ Block *b = ba->cur;
+ unsigned have = b->stk - LBT_DOFF - b->ent*sizeof(Dict) - ba->let;
+ unsigned need = sizeof(Dict) + ba->key.len;
+ unsigned want = need + ba->got * bt->vsz;
+ unsigned take = ba->got;
+ if ( have < want ) { /* you can't always get ... */
+ if ( have <= need )
+ take = 0;
+ else {
+ take = (have - need) / bt->vsz;
+ if ( 256 > take * bt->vsz )
+ take = 0;
+ }
+ }
+#if 0 /* prefix key turbo ??? -- doesn't seem to work all too well */
+ if ( have < (bt->bsz >> 1) ) { /* ready to waste half a block ? */
+ Dict *d = &b->dct[ b->ent-1 ];
+ unsigned char *last = LBT_BASE(b)+LBT_POS(d);
+ if ( ba->key.byt[0] != last[0]
+ || ba->key.byt[1] != last[1]
+ || ba->key.byt[2] != last[2]
+ )
+ take = 0;
+ }
+#endif
+ LOG_DBG( LOG_DEBUG, "batch '%.*s' got %u need %u want %u have %u take %u",
+ ba->key.len, ba->key.byt, ba->got, need, want, have, take );
+ /* we have enough room in b for take entries */
+ if ( take ) {
+ unsigned char *base = LBT_BASE( b );
+ unsigned char *s = base + b->stk;
+ int i = take;
+ Dict *d = &b->dct[ b->ent++ ];
+ while ( i-- )
+ memcpy( s -= bt->vsz, ba->val[i].byt, bt->vsz );
+ memcpy( s -= ba->key.len, ba->key.byt, ba->key.len );
+ b->stk = (unsigned short) (s - base);
+ LBT_PV( d, b->stk, take );
+ d->kln = ba->key.len;
+ if ( ba->got > take )
+ memcpy( ba->val, ba->val+take, sizeof(ba->val[0])*(ba->got - take) );
+ ba->got -= take;
+ }
+ if ( ! ba->got )
+ break;
+ if ( take ) /* key spans blocks */
+ ba->span++;
+ ba->free += b->stk - LBT_DOFF - b->ent*sizeof(Dict); /* wasted */
+ b->nxt = bt->len; /* link */
+ LOG_DBG( LOG_DEBUG, "batch: %u entries left, getting block %u",
+ ba->got, bt->len );
+ if ( WRULK( b, bt ) )
+ return log_msg( LOG_SYSERR, "\twhen writing batch block" );
+ /* so couldn't flush all, thus we need another block */
+ b = ba->cur = NEWBLK( bt, 0, LBT_EX );
+ /* will continue on end or keychange */
+ if ( b->ent || b->stk < bt->bsz )
+ return log_msg( ERR_TRASH, "new block %u is not empty: ent %u stk %u",
+ b->num, b->ent, b->stk );
+ LOG_DBG( LOG_VERBOSE, "new block %u has %u ent %u stk, will keep %u",
+ b->num, b->ent, b->stk, ba->let );
+ }
+ if ( cmp || ! key->val.len ) /* done with key: stats */
+ if ( ba->maxv < ba->tot )
+ ba->maxv = ba->tot;
+ if ( key->val.len ) {
+ if ( cmp ) { /* set new key */
+ if ( ba->got )
+ return log_msg( ERR_IDIOT, "batch not empty on new key" );
+ ba->tot = 0;
+ memcpy( ba->key.byt, key->byt, ba->key.len = key->len );
+ ba->keys++;
+ ba->keyl += key->len;
+ if ( ba->maxl < key->len )
+ ba->maxl = key->len;
+ }
+ /* add new val */
+ /* LOG_HEX( key->val.byt, bt->vsz ); */
+ memcpy( ba->val[ ba->got++ ].byt, key->val.byt, bt->vsz );
+ ba->vals++;
+ ba->tot++;
+ } else {
+ WRULK( ba->cur, bt );
+ mktree( bt );
+ }
+
+ return 0;
+} /* lbt_batchval */
+
+
+/* TODO: this function is an awful mess and needs complete rewrite, KR
+*/
+int lbt_add ( Idx *bt, Key *key )
+{
+ int ret = 0;
+ unsigned short e, i = 0;
+ Block *b, *s = 0; /* mainblock, sibling */
+ Dict *d;
+ unsigned char *base, *sbase, *src, *v, *stk;
+ FindKey f;
+ int hadkey, split = 0, insibling = 0;
+ unsigned need;
+ PrVal pv;
+
+ f.bt = bt;
+ f.key = key;
+ if ( bt->key && key->len > bt->key )
+ key->len = bt->key;
+ if ( findkey( &f, LBT_WR ) ) {
+ if ( f.b[0] )
+ ULK( f.b[0] );
+ b = 0;
+ LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
+ }
+ e = (LBT_ENTMASK & f.e[0]);
+ b = f.b[0];
+ base = LBT_BASE(b);
+ d = b->dct + e;
+ if ( ! (hadkey = LBT_MATCH & f.e[0]) ) /* new key */
+ need = key->len + sizeof(Dict) + bt->vsz;
+ else {
+ unsigned vln = LBT_VLN(d);
+ v = base+LBT_POS(d)+d->kln;
+ if ( 1 == vln ) { /* check stopword -- value entirely zeroed */
+ int l = bt->vsz;
+ unsigned char *C = v;
+ while ( l-- && !*C++ ) /* huh huh, he said "C++" */
+ ;
+ if ( -1 == l ) {
+ LOG_DBG( LOG_VERBOSE, "key '%.*s' is a stopword", key->len, key->byt );
+ goto done;
+ }
+ }
+ i = valpos( v, vln, bt->vsz, key->val.byt );
+ if ( LBT_MATCH & i ) {
+ LOG_DBG( LOG_DEBUG,
+ "adding key '%.*s'/%s: already there pos %u",
+ key->len, key->byt, PRVAL( pv, key->val ), i & ~LBT_MATCH );
+ goto done; /* value already there */
+ }
+ need = bt->vsz;
+ }
+ LOG_DBG( LOG_DEBUG,
+ "adding key '%.*s'/%s in block %d had %d ent %d stk %d need %d",
+ key->len, key->byt, PRVAL( pv, key->val ),
+ b->num, hadkey, b->ent, b->stk, need );
+
+ if ( need > b->stk - LBT_DOFF - b->ent*sizeof(Dict) ) {
+ /* now that's bad: not enough space
+ create new block s, move last entries there
+ starting from the end, entries (including the new one) are collected,
+ until their combined size reaches more than half a block.
+ If the combined size then is more than 2/3 a block,
+ and the last entry alone takes more than 1/3 a block, it is split.
+ TODO: check existing sibling for free space
+ */
+ unsigned half = (bt->bsz - LBT_DOFF) >> 1;
+ unsigned tot = 0;
+ unsigned mv = b->ent;
+
+ LOG_DBG( LOG_DEBUG, "splitting block %d need %d have %d (stk %d ent %d)",
+ b->num, need, b->stk - LBT_DOFF - b->ent*sizeof(Dict), b->stk, b->ent );
+ if ( need > (bt->bsz - LBT_DOFF) / 3 )
+ LOG_OTO( done, (ERR_IDIOT, "OOPS! excessive need for %d bytes", need) );
+ /* figure out split position */
+ if ( !hadkey && b->ent == e ) {
+ tot = need;
+ insibling = !0;
+ }
+ d = b->dct+mv;
+ while ( d--, mv-- ) {
+ unsigned sz = sizeof(Dict)+d->kln+LBT_VLN(d)*bt->vsz;
+ if ( half > (tot += sz) ) { /* sz fits */
+ if ( e == mv && !hadkey ) {
+ /*
+ check size of new entry.
+ In case of hadkey, we don't bother for the bt->vsz bytes.
+ */
+ if ( half < (tot + need) ) {
+ if ( (half - tot) > need/2 ) { /* more balanced if we move */
+ tot += need;
+ insibling = !0;
+ }
+ break;
+ }
+ tot += need;
+ insibling = !0;
+ }
+ if ( mv )
+ continue; /* else croak on ! mv below */
+ }
+ /* exceeded half -- maybe we move entry mv anyway */
+ if ( tot > (bt->bsz - LBT_DOFF)*2/3 ) {
+ if ( tot-sz < (bt->bsz - LBT_DOFF)/3 )
+ split = !0;
+ else { /* undo -- don't move this entry */
+ tot -= sz;
+ d++; mv++;
+ }
+ } else if ( ! mv )
+ LOG_OTO( done, (ERR_IDIOT, "OOPS! didn't find end tot %d bytes", tot) );
+ break;
+ } /* while mv-- */
+ if ( mv < e || (mv == e && hadkey && !split) )
+ insibling = !0;
+ /* now mv is the first entry to move */
+ s = NEWBLK( bt, 0, LBT_WR );
+ LOG_DBG( LOG_DEBUG,
+ "split: move %d of %d to new blk %d e %d tot %d insibling %d split %d",
+ mv, b->ent, s->num, e, tot, insibling, split );
+ s->nxt = b->nxt;
+ b->nxt = s->num;
+ sbase = LBT_BASE(s);
+ if ( split ) {
+ /* now total includes full size of entry mv.
+ if we included a new entry, it might even exceed bsz.
+ calculate sizes for both blocks w/o mv's values.
+ */
+ unsigned esz = sizeof(Dict)+d->kln; /* entry's base cost */
+ unsigned vln = LBT_VLN(d);
+ unsigned keep, diff, mvval, kpval, mvsiz, pos = LBT_POS(d);
+ /*
+ from currently used space, substract tot (including all
+ values and possibly new key "need"), but add entry base cost.
+ that should be the size remaining if we where moving
+ all of the values but keep an entry and key.
+ maybe slightly biased if need is bt->vsz (the hadkey case).
+ */
+ keep = bt->bsz - b->stk + b->ent*sizeof(Dict) + need - tot + esz;
+ tot -= vln * bt->vsz; /* undo entries */
+ diff = (keep > tot ? keep - tot : tot - keep) / bt->vsz;
+ if ( diff > vln )
+ LOG_OTO( done, (ERR_IDIOT,
+ "OOPS! got diff %d vln %d entry %d tot %d keep %d",
+ diff, vln, mv, tot, keep) );
+ mvval = (vln - diff)/2;
+ if ( keep > tot )
+ mvval += diff;
+ kpval = vln - mvval;
+ if ( ! mvval || ! kpval ) {
+ LOG_DBG( LOG_WARN, "bad mv/kpval %d/%d keep %d tot %d vln %d diff %d",
+ mvval, kpval, keep, tot, vln, diff );
+ if ( ! mvval )
+ kpval = vln - (mvval = 1);
+ else
+ mvval = vln - (kpval = 1);
+ }
+ tot += mvsiz = mvval*bt->vsz;
+ keep += kpval*bt->vsz;
+ src = base+pos;
+ LOG_DBG( LOG_DEBUG,
+ "split: entry %d '%.*s' mv %d of %d values, i %d keep %d tot %d",
+ mv, d->kln, src, mvval, vln, i, keep, tot );
+ if ( keep > (bt->bsz-LBT_DOFF) || tot > (bt->bsz-LBT_DOFF) )
+ LOG_OTO( done, (ERR_IDIOT,
+ "OOPS! got diff %d vln %d mvval %d entry %d tot %d keep %d",
+ diff, vln, mvval, mv, tot, keep) );
+ /* SIGH -- do it */
+ memcpy( sbase + (s->stk -= mvsiz),
+ src+d->kln+kpval*bt->vsz, mvsiz );
+ if ( e == mv && hadkey && i > kpval ) {
+ insibling = !0;
+ s->stk -= bt->vsz;
+ i -= kpval;
+ memmove( sbase+s->stk, sbase+s->stk+bt->vsz, i*bt->vsz );
+ memcpy( sbase+s->stk+i*bt->vsz, key->val.byt, bt->vsz );
+ mvval++;
+ }
+ memcpy( sbase + (s->stk -= d->kln), src, d->kln );
+ LBT_PVK( s->dct, s->stk, mvval, d->kln );
+ s->ent = 1;
+ memmove( src + mvsiz, src, d->kln+kpval*bt->vsz );
+ pos += mvsiz;
+ LBT_PV( d, pos, kpval );
+ d++; mv++;
+ }
+ /* now entries mv..b->ent are moved, mv > 0 */
+ b->stk = LBT_POS(b->dct+mv-1); /* pre-adjust stack */
+ if ( mv < e ) { /* move entries mv..e-1 */
+ /* used stack space is from pos of entry e-1 to end of entry mv
+ (might differ from pos of mv-1 due to split)
+ */
+ unsigned from = LBT_POS(b->dct+e-1), nent = e-mv,
+ sz = LBT_POS(b->dct+mv)+b->dct[mv].kln
+ +LBT_VLN(b->dct+mv)*bt->vsz - from;
+ int adj = (s->stk -= sz) - from; /* entry position adjustment */
+ memcpy( sbase + s->stk, base+from, sz );
+ memcpy( s->dct+s->ent, b->dct+mv, nent*sizeof(Dict) );
+ d = s->dct + s->ent;
+ s->ent += nent;
+ for ( ; nent--; d++ )
+ LBT_POSADD( d, adj );
+ }
+ if ( insibling && mv <= e ) { /* now mv or insert e, unless done on split */
+ d = b->dct + e;
+ if ( hadkey ) { /* mv existing entry e >= mv, add new val */
+ /* i is valpos */
+ unsigned vln = LBT_VLN(d);
+ src = base+LBT_POS(d)+d->kln;
+ if ( i < vln ) {
+ unsigned sz = (vln - i)*bt->vsz;
+ memcpy( sbase + (s->stk -= sz), src + i*bt->vsz, sz );
+ }
+ memcpy( sbase + (s->stk -= bt->vsz), key->val.byt, bt->vsz );
+ if ( i ) {
+ unsigned sz = i*bt->vsz;
+ memcpy( sbase + (s->stk -= sz), src, sz );
+ }
+ memcpy( sbase + (s->stk -= d->kln), base+LBT_POS(d), d->kln );
+ LBT_PVK( s->dct + s->ent, s->stk, vln+1, d->kln );
+ e++; /* e is moved */
+ } else {
+ memcpy( sbase + (s->stk -= bt->vsz), key->val.byt, bt->vsz );
+ memcpy( sbase + (s->stk -= key->len), key->byt, key->len );
+ LBT_PVK( s->dct + s->ent, s->stk, 1, key->len );
+ }
+ s->ent++;
+ } /* mv or insert e */
+ if ( e < b->ent && mv < b->ent ) { /* move entries max(e,mv) to end */
+ unsigned fst = mv < e ? e : mv;
+ /* used stack space is from pos of entry b->ent-1 (the original b->stk)
+ to end of entry fst (might differ from pos of fst-1 due to split)
+ */
+ unsigned from = LBT_POS(b->dct+b->ent-1), nent = b->ent-fst,
+ sz = LBT_POS(b->dct+fst)+b->dct[fst].kln
+ +LBT_VLN(b->dct+fst)*bt->vsz - from;
+ int adj = (s->stk -= sz) - from; /* entry position adjustment */
+ LOG_DBG( LOG_DEBUG, "split: entries %d..%d sz %d from %d to %d",
+ fst, b->ent-1, sz, from, s->stk );
+ memcpy( sbase + s->stk, base+from, sz );
+ memcpy( s->dct+s->ent, b->dct+fst, nent*sizeof(Dict) );
+ d = s->dct + s->ent;
+ s->ent += nent;
+ for ( ; nent--; d++ ) {
+#if !defined(NDEBUG)
+ unsigned p = LBT_POS(d);
+#endif
+ LBT_POSADD( d, adj );
+#if !defined(NDEBUG)
+ LOG_DBG( LOG_DEBUG, "new ent %d p/v/k %d/%d/%d o %d",
+ d - s->dct, LBT_POS(d), LBT_VLN(d), d->kln, p );
+#endif
+ }
+ }
+ b->ent = mv; /* kill entries */
+ LOG_DBG( LOG_DEBUG,
+ "split: blks %d/%d with %d/%d entries, s starts '%.*s'",
+ b->num, s->num, b->ent, s->ent, s->dct[0].kln, sbase+LBT_POS(s->dct) );
+ } /* new sibling */
+
+ if ( ! insibling ) {
+ /* now we do have enough space */
+ d = b->dct + e;
+ stk = base + b->stk; /* bottom of data stack */
+ if ( hadkey ) { /* insert value at i */
+ unsigned char *val = base + LBT_POS(d)+d->kln+i*bt->vsz;
+ need = bt->vsz;
+#if 0
+ LOG_DBG( LOG_DEBUG,
+ "in entry %d '%.*s' val %d: memmove( %p %p %d) val %p need %d",
+ e, d->kln, base+LBT_POS(d), i,
+ stk-need, stk, (val-stk)+need, val, need );
+#endif
+ memmove( stk - need, stk, (val - stk)+need );
+ memcpy( val - need, key->val.byt, bt->vsz );
+ LBT_VLNADD( d, 1 );
+ } else { /* insert key, value and dict entry */
+ /* take dict and data space from entry e */
+ unsigned char *nu;
+ need = key->len + bt->vsz;
+ if ( e == b->ent )
+ nu = base + b->stk - need;
+ else {
+ /* at end of next entry, i.e. end of block or beg of previous */
+ nu = base + LBT_POS(d) + d->kln + LBT_VLN(d)*bt->vsz;
+ memmove( stk - need, stk, nu - stk );
+ nu -= need;
+ memmove( d+1, d, (b->ent - e)*sizeof(Dict) );
+ }
+ memcpy( nu, key->byt, key->len );
+ memcpy( nu + key->len, key->val.byt, bt->vsz );
+ LBT_PV( d, nu-base, 1 );
+ d->kln = key->len;
+ d++;
+ e++;
+ b->ent++;
+ }
+ /*
+ need is set to the needed (i.e. additionally used) data space,
+ e is the first dict entry to redirect need bytes lower
+ */
+ b->stk -= need;
+ for ( ; e < b->ent; e++, d++ )
+ LBT_POSADD( d, -need );
+ } /* ! insibling */
+ while ( s ) { /* created sibling */
+ unsigned num = s->num;
+ unsigned char *e0 = LBT_BASE(s) + LBT_POS(s->dct);
+ unsigned l = s->lev + 1;
+ Block *ins;
+ Key k;
+
+ memcpy( k.byt, e0, k.len = s->dct[0].kln );
+ k.val.len = 0;
+ if ( s->lev ? s->dct[0].vln : split )
+ memcpy( k.val.byt, e0+k.len, k.val.len = bt->vsz );
+ LOG_DBG( LOG_DEBUG, "split: had %d/%d on lev %d key '%.*s' vln %d %s",
+ b->num, num, s->lev, k.len, k.byt, k.val.len, PRVAL( pv, k.val ) );
+
+ /* get parent */
+ if ( f.b[l]->num == f.n[l] && ! LBT_BUSY(f.b[l]) )
+ f.b[l]->lck = LBT_WR;
+ else
+ f.b[l] = getblk( bt, f.n[l], l, LBT_WR );
+ LOG_DBG( LOG_DEBUG,
+ "split: got parent %d lev %d", f.b[l]->num, f.b[l]->lev );
+ /* MT TODO: again L/Y link hunt */
+ /* ok, locked the parent -- release s and b */
+ if ( (ret = WRULK( s, bt )) || (ret = WRULK( b, bt )) )
+ break;
+ s = 0;
+ ins = b = f.b[l];
+ base = LBT_BASE(b);
+ e = KEYPOS( bt, b, &k );
+ LOG_DBG( LOG_DEBUG, "split: got parent pos %d", e );
+ if ( LBT_MATCH & e ) /* !??? */
+ LOG_OTO( done, (ERR_TRASH, "OOPS! found key in parent" ) );
+ if ( ! e ) {
+ TRACEKEY( ERR_TRASH, &f );
+ LOG_OTO( done, (ERR_TRASH, "OOPS! parent insert at 0 lev %d", l ) );
+ }
+ need = sizeof(Dict) + k.len + 4 + k.val.len;
+ if ( need > b->stk - LBT_DOFF - b->ent*sizeof(Dict) ) {
+ unsigned half = ((bt->bsz - b->stk)+b->ent*sizeof(Dict)+need) / 2;
+ unsigned mv = b->ent, tot = 0, nent, sz;
+ int adj;
+ if ( e == b->ent ) {
+ tot = need;
+ ins = 0;
+ }
+ for ( d = b->dct+mv; d--, mv--; ) {
+ sz = sizeof(Dict) + d->kln + 4 + d->vln;
+ if ( half > (tot += sz) ) {
+ if ( e == mv ) {
+ if ( half > tot+need || tot+need-half < half-tot ) {
+ tot += need;
+ ins = 0; /* new link goes to sibling */
+ } else
+ break;
+ if ( half <= tot )
+ break;
+ }
+ if ( mv )
+ continue;
+ }
+ if ( tot - half > sz/2 ) { /* don't move this */
+ tot -= sz;
+ d++; mv++;
+ } else if ( ! mv ) /* !??? */
+ mv = 1;
+ break;
+ }
+ /* split here a little bit simpler */
+ s = NEWBLK( bt, l, LBT_WR );
+ s->nxt = b->nxt;
+ b->nxt = s->num;
+ /* straight move entries mv..b->ent-1 to sibling */
+ sbase = LBT_BASE(s);
+ d = b->dct + mv;
+ sz = b->dct[mv-1].pos - b->stk;
+ nent = b->ent - mv;
+ LOG_DBG( LOG_DEBUG,
+ "split: parent lev %d split %d to new %d mv %d"
+ " '%.*s'/%s nent %d sz %d",
+ l, b->num, s->num, mv, d->kln, base+d->pos,
+ prval( pv, base+d->pos+d->kln, d->vln ), nent, sz );
+ memcpy( sbase + (s->stk -= sz), base + b->stk, sz );
+ adj = s->stk - b->stk;
+ b->stk += sz;
+ memcpy( s->dct, d, nent*sizeof(Dict) );
+ b->ent = mv;
+ s->ent = nent;
+ for ( d = s->dct; nent--; d++ )
+ d->pos += adj;
+ if ( ! ins ) {
+ ins = s;
+ e -= mv;
+ }
+ if ( (ret = chkblk( s, bt )) )
+ break;
+ }
+ /* now insert link in block ins at pos e */ {
+ unsigned isz = k.len + 4 + k.val.len, nent = ins->ent - e;
+ unsigned char *ibase = LBT_BASE(ins);
+ unsigned end = e ? ins->dct[e-1].pos : bt->bsz;
+ LOG_DBG( LOG_DEBUG,
+ "split: ins parent %d lev %d pos %d key '%.*s' -> %d vln %d %s",
+ ins->num, ins->lev, e, k.len, k.byt, num, k.val.len,
+ PRVAL( pv, k.val ) );
+ memmove( ibase + ins->stk - isz, ibase + ins->stk, end - ins->stk );
+ ins->stk -= isz;
+ memcpy( ibase + (end -= 4), &num, 4 );
+ if ( k.val.len )
+ memcpy( ibase + (end -= k.val.len), k.val.byt, k.val.len );
+ memcpy( ibase + (end -= k.len), k.byt, k.len );
+ for ( d = ins->dct+ins->ent; nent--; d-- ) {
+ *d = d[-1];
+ d->pos -= isz;
+ }
+ d->pos = end;
+ d->vln = k.val.len;
+ d->kln = k.len;
+ ins->ent++;
+ if ( (ret = chkblk( ins, bt )) )
+ break;
+ }
+ if ( ! s ) /* write b below or in next turn */
+ break;
+ if ( ! b->num ) { /* root split */
+ Block *bb = NEWBLK( bt, l, LBT_WR );
+ LOG_DBG( LOG_DEBUG, "root split!" );
+ /* cp all but the num */
+ memcpy( LBT_BASE(bb)+4, LBT_BASE(b)+4, bt->bsz-4 );
+ b->nxt = s->nxt; /* restore root's special nxt */
+ s->nxt = 0;
+ b->stk = bt->bsz; /* empty the root */
+ /* add two childs bb and s */
+ SETINT( base + (b->stk -= 4), bb->num );
+ LBT_PVK( b->dct, b->stk, 0, 0 );
+ SETINT( base + (b->stk -= 4), s->num );
+ e0 = LBT_BASE(s) + LBT_POS(s->dct);
+ if ( s->dct[0].vln )
+ memcpy( base + (b->stk -= bt->vsz), e0+s->dct[0].kln, bt->vsz );
+ memcpy( base + (b->stk -= s->dct[0].kln), e0, s->dct[0].kln );
+ LBT_PVK( b->dct+1, b->stk, s->dct[0].vln, s->dct[0].kln );
+ b->ent = 2;
+ b->lev++; /* increase tree depth */
+ if ( (ret = WRULK( s, bt )) || (ret = WRULK( bb, bt )) )
+ break;
+ s = 0;
+ break;
+ }
+ }
+ if ( b && !ret )
+ ret = WRULK( b, bt );
+done:
+ if ( s )
+ ULK( s );
+ if ( b )
+ ULK( b );
+ if ( !ret && b->lev && findkey( &f, LBT_RD ) )
+ ret = log_msg( ERR_TRASH, "could not find key after add" );
+ return ret;
+} /* lbt_add */
+
+
+int lbt_del ( Idx *bt, Key *key )
+{
+ int ret = 0;
+ unsigned short e, n, i, mv;
+ Block *b;
+ Dict *d;
+ unsigned char *base, *v;
+ FindKey f;
+ PrVal pv;
+
+ f.bt = bt;
+ f.key = key;
+ LOG_DBG( LOG_DEBUG, "deleting key '%.*s'/%s",
+ key->len, key->byt, PRVAL( pv, key->val ) );
+ if ( findkey( &f, LBT_WR ) )
+ LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
+ if ( !(LBT_MATCH & f.e[0]) ) /* key not found - ok */
+ goto done;
+ LOG_DBG( LOG_DEBUG, "key found" );
+ e = (LBT_ENTMASK & f.e[0]);
+ b = f.b[0];
+ d = b->dct + e;
+ base = LBT_BASE(b);
+ v = base+LBT_POS(d)+d->kln; /* start of values */
+ n = LBT_VLN(d);
+ if ( ! n )
+ goto done;
+ i = valpos( v, n, bt->vsz, key->val.byt );
+ LOG_DBG( LOG_DEBUG, "val %sfound pos %u of %u",
+ (LBT_MATCH & i) ? "" : "not ", i & ~LBT_MATCH, n );
+ if ( !(LBT_MATCH & i) ) /* val not found - ok */
+ goto done;
+ i &= ~LBT_MATCH;
+ LOG_DBG( LOG_DEBUG,
+ "del blk %d e %d i/n %d/%d", b->num, e, i, n );
+ if ( 1 < n ) {
+ v += i*bt->vsz; /* end of stack area to move: start of this value */
+ mv = bt->vsz; /* amount to move: one value size */
+ LBT_VLNADD( d, -1 );
+ } else { /* sole value -- rm whole entry */
+ v = base+LBT_POS(d); /* start of key */
+ mv = d->kln+bt->vsz;
+ for ( i=b->ent - e - 1; i--; d++ ) /* move the i entries after d down */
+ *d = d[1];
+ d = b->dct + e; /* reset */
+ b->ent--;
+ }
+ memmove( base+b->stk+mv, base+b->stk, v-(base+b->stk) );
+ b->stk += mv;
+ for ( ; e++ < b->ent; d++ )
+ LBT_POSADD( d, mv );
+ WRULK( b, bt );
+done:
+ if ( f.b[0] )
+ ULK( f.b[0] );
+ return ret;
+} /* lbt_del */
+
+
+
+int lbt_loop ( Idx *bt, DXLoop *l )
+{
+ int ret = 0;
+ unsigned i = 0;
+ int mode = IDXMODE & l->flg;
+ Block *b = 0;
+ Key *cmp = &l->key;
+ if ( ! cmp->len ) /* start at 1st leaf */
+ b = getblk( bt, 1, 0, LBT_RD );
+ else { /* locate key */
+ FindKey f;
+ f.bt = bt;
+ f.key = cmp;
+ if ( findkey( &f, LBT_RD ) )
+ LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
+ b = f.b[0];
+ i = (unsigned)(LBT_ENTMASK & f.e[0]);
+ if ( !(LBT_MATCH & f.e[0]) && IDXEQ == mode )
+ goto done;
+ }
+ if ( IDXUPTO <= mode )
+ cmp = &l->to;
+ for ( ;; i=0 ) { /* blocks */
+ unsigned char *base = LBT_BASE( b );
+ Dict *d;
+ if ( ! b || b->lev )
+ return -ERR_TRASH;
+ b->lck = LBT_EX; /* lock */
+ d = b->dct + i;
+ for ( ; i<b->ent; i++, d++ ) {
+ unsigned short pos = LBT_POS( d );
+ unsigned short vln = LBT_VLN( d );
+ unsigned short kln = d->kln;
+ unsigned short j;
+ unsigned char *v = base+pos+kln;
+ Key key;
+ Hit hit;
+ int diff = memcmp( cmp->byt, base+pos,
+ kln <= cmp->len ? kln : cmp->len );
+ switch ( mode ) {
+ case IDXPF:
+ if ( diff || kln < cmp->len )
+ goto moan;
+ break;
+ case IDXEQ:
+ if ( diff || kln != cmp->len )
+ goto done;
+ break;
+ case IDXUPTO:
+ if ( 0 >= diff )
+ goto done;
+ break;
+ case IDXINCL:
+ if ( 0 > diff || (!diff && kln > cmp->len) )
+ goto done;
+ break;
+ default: /* unused */
+ moan:
+ log_msg( LOG_INFO,
+ "loop ends on key '%.*s'(%d) %d %d",
+ kln, base+pos, kln, diff, cmp->len );
+ goto done;
+ }
+ memcpy( key.byt, base+pos, key.len = kln );
+ if ( l->cb )
+ for ( j=0; j<vln; j++, v += bt->vsz ) {
+ rdval( &hit, v, bt->vsz );
+ ret = l->cb( l->me, &key, &hit );
+ if ( ret )
+ goto done;
+ }
+ else { /* internal check */
+ FindKey f;
+ f.bt = bt;
+ f.key = &key;
+ memcpy( key.val.byt, v, key.val.len = bt->vsz );
+ if ( findkey( &f, LBT_RD ) )
+ LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
+ if ( !(LBT_MATCH & f.e[0])
+ || b != f.b[0]
+ || i != (unsigned)(LBT_ENTMASK & f.e[0])
+ ) {
+ log_msg( ERR_TRASH,
+ "OOPS! want %u(%p).%u got %u(%p).%u (match %x)",
+ b->num, b, i, f.b[0]->num, f.b[0], LBT_ENTMASK & f.e[0], f.e[0] );
+ rdval( &hit, v, bt->vsz );
+ log_msg( ERR_TRASH,
+ "\twhen looking for '%.*s'/%u.%u.%u.%u",
+ key.len, key.byt, hit.mfn, hit.tag, hit.occ, hit.pos );
+ if ( f.b[0]->ent <= (LBT_ENTMASK & f.e[0]) )
+ log_msg( ERR_TRASH, "\tI found nothing" );
+ else {
+ Dict *dd = f.b[0]->dct + (LBT_ENTMASK & f.e[0]);
+ unsigned char *bb = LBT_BASE(f.b[0])+LBT_POS(dd);
+ rdval( &hit, bb+dd->kln, bt->vsz );
+ log_msg( ERR_TRASH,
+ "\tI found '%.*s'/%u.%u.%u.%u",
+ dd->kln, bb, hit.mfn, hit.tag, hit.occ, hit.pos );
+ }
+ } else
+ log_msg( LOG_TRACE, "key ok" );
+ }
+ }
+ ULK( b );
+ if ( ! b->nxt || !(b = getblk(bt, b->nxt, 0, LBT_RD)) )
+ break;
+ }
+done:
+ if ( b )
+ ULK( b );
+ if ( l->cb )
+ l->cb( l->me, 0, 0 );
+ else
+ log_msg( LOG_INFO,
+ "checked %i keys in %u blks of %dK depth %u cmp/get/miss %u/%u/%u",
+ stat.key, bt->len, bt->bsz>>10, bt->root->lev, stat.cmp, stat.get, stat.miss );
+ return ret;
+} /* lbt_loop */
+
+
+/* mimik the plain old ldb_search */
+int lbt_search ( Idx *bt, Key *key, LdbPost *post, Rec *rec )
+{
+ int ret = 0;
+ unsigned i = 0;
+ int cont = rec && rec->len; /* continuing previous terms search */
+ char *stk = rec ? ((char*)rec + rec->bytes) : 0; /* term stack */
+ int prefix;
+ Block *b = 0;
+ Key start;
+ LdbP * const p = post->p; /* really just an alias */
+
+ /* check for prefix match */
+ if ( post ) {
+ if ( 8 != bt->vsz )
+ return log_msg( ERR_INVAL, "bad legacy search on vsz %d", bt->vsz );
+ prefix = LDB_PFX & post->mode;
+ } else {
+ if ( ! rec )
+ return log_msg( ERR_INVAL, "lbt_search needs rec or post" );
+ if ( (prefix = key->len && '$' == key->byt[key->len - 1]) )
+ key->len--;
+ }
+
+ if ( ! cont )
+ start = *key;
+ else {
+ memset( &start, 0, sizeof(start) );
+ start.len = 255 < rec->field[rec->len-1].len
+ ? 255 : rec->field[rec->len-1].len;
+ memcpy( start.byt, rec->field[rec->len-1].val, start.len );
+ rec->len = 0;
+ }
+
+ if ( ! start.len ) /* start at 1st leaf */
+ b = getblk( bt, 1, 0, LBT_RD );
+ else { /* locate key */
+ FindKey f;
+ f.bt = bt;
+ f.key = &start;
+ if ( findkey( &f, LBT_RD ) )
+ LOG_OTO( ouch, ( ERR_TRASH, "could not find key" ) );
+ b = f.b[0];
+ i = (unsigned)(LBT_ENTMASK & f.e[0]);
+ if ( !(LBT_MATCH & f.e[0]) && ! prefix )
+ goto done;
+ }
+
+ for ( ;; i=0 ) { /* blocks */
+ unsigned char *base = LBT_BASE( b );
+ Dict *d;
+ if ( ! b || b->lev )
+ return -ERR_TRASH;
+ b->lck = LBT_EX; /* lock */
+ d = b->dct + i;
+ for ( ; i<b->ent; i++, d++ ) { /* entries */
+ unsigned char *ent = base + LBT_POS( d ); /* start of key */
+ unsigned short kln = d->kln;
+ if ( memcmp( key->byt, ent, kln <= key->len ? kln : key->len )
+ || (prefix ? kln < key->len : kln != key->len )
+ )
+ goto done;
+ if ( rec ) { /* record term */
+ Field *f = rec->field + rec->len; /* field to assign */
+ /** append term to record, unless it's the same as previous */
+ if ( rec->len ) {
+ if ( kln == f[-1].len && !memcmp( f[-1].val, ent, kln ) )
+ continue;
+ } else if ( cont ) {
+ if ( kln == start.len && !memcmp( start.byt, ent, kln ) )
+ continue;
+ }
+ stk -= kln;
+ if ( stk < (char*)(f+1) ) /* stack reached field dict */
+ goto done;
+ memcpy( stk, ent, kln );
+ f->tag = 0;
+ f->val = stk;
+ f->len = kln;
+ rec->len++;
+ continue;
+ }
+ { /* more locals */
+ LdbP merge[1024];
+ unsigned short vln, k, m=0;
+ unsigned char *c;
+ int f = post->fil-1; /* highest pos to consider in given postings */
+ if ( !(vln = LBT_VLN( d )) )
+ continue;
+ ent += kln; /* set to start of values */
+ c = ent + 8*(vln-1); /* last value */
+ /* collect postings */
+ for ( k=vln; k--; c-=8 ) {
+ /* loop backwards (for the fun of it) postings in segment */
+ int prow, ptag, ppos;
+ LdbP e; /* the entry */
+ LdbP samerow; /* highest possible entry w/ same row as e */
+#if defined( LDB_BIG_ENDIAN )
+ /* the 8 bytes of a posting are BIG ENDIAN ! */
+ memcpy(e.bytes,c,8);
+#else
+ e.bytes[0] = c[7]; e.bytes[1] = c[6];
+ e.bytes[2] = c[5]; e.bytes[3] = c[4];
+ e.bytes[4] = c[3]; e.bytes[5] = c[2];
+ e.bytes[6] = c[1]; e.bytes[7] = c[0];
+#endif
+ prow = LDBP_ROW( &e );
+ ptag = LDBP_TAG( &e );
+ ppos = LDBP_POS( &e );
+ LOG_DBG( LOG_VERBOSE, "post %d.%hd pos %06x", prow, ptag, ppos );
+ if ( !prow )
+ continue;
+ if ( (post->cut && prow >= post->cut)
+ || (post->tag && post->tag != ptag)
+ )
+ continue;
+ if ( prow < post->skp ) /* quickly bail out on skip condition */
+ break;
+ LDBP_SETROWTOP( &samerow, &e ); /* for mfn comparison */
+ /* sweep down to postings for the same row as e ... */
+ while ( f >= 0 && LDBP_GT( p+f, &samerow ) )
+ f--;
+ if ( LDB_AND & post->mode ) {
+ int l;
+ /* loop postings for same row, mark all (that are near enough) */
+ LDBP_SETROWBOT( &samerow, &e ); /* for mfn comparison */
+ /* NOTE: postings for row are GT than bottom even if marked */
+ for ( l = f; l>=0 && LDBP_GT( p+l, &samerow ); l-- ) {
+ if ( post->near ) {
+ int dist;
+ if ( ptag != LDBP_TAG( p+l ) ) continue;
+ if ( LDB_NEAR_G != post->near ) {
+ dist = LDBP_POS( p+l ) - LDBP_POS( &e );
+ if ( dist < 0 ) dist = -dist;
+ if ( 0 < post->near
+ ? post->near < dist
+ : -post->near != dist /* exact $$$$ */
+ ) continue;
+ }
+ }
+ LDBP_SETMARK( p+l );
+ }
+ } else { /* OR mode */
+ int add;
+ if ( ! post->near ) /* add if row not found: ignore details */
+ add = 0 > f || prow > LDBP_ROW( p+f );
+ else { /* add if no exact match */
+ int l;
+ /* NOTE: we don't use mark bit in OR mode, do we ? */
+ for ( l = f; l>=0 && LDBP_GT( p+l, &e ); l-- )
+ ;
+ add = 0 > l || LDBP_GT( &e, p+l );
+ }
+ if ( add )
+ merge[ m++ ] = e;
+ }
+ } /* for postings in segment */
+ if ( m ) { /* merge in the merge buffer */
+ LdbP *mm = merge;
+ for ( k = post->fil += m; m && k--; ) {
+ LdbP src;
+ if ( k < m || LDBP_GT( mm, &p[k-m] ) ) {
+ src = *mm++;
+ m--;
+ LOG_DBG( LOG_DEBUG, "merging %d at %d", LDBP_ROW(&src), k );
+ } else
+ src = p[k-m];
+ if ( k < post->len )
+ p[k] = src;
+ else { /* set cut */
+ int row = LDBP_ROW( &src );
+ if ( row < post->cut || !post->cut )
+ post->cut = row;
+ }
+ }
+ if ( post->fil > post->len )
+ post->fil = post->len;
+ if ( post->cut ) /* postings for cut row are unreliable */
+ while ( post->fil && post->cut <= LDBP_ROW(p+post->fil-1) )
+ post->fil--;
+ } /* if ( m ) */
+ } /* more locals */
+ } /* for entries in block */
+ ULK( b );
+ if ( ! b->nxt || !(b = getblk(bt, b->nxt, 0, LBT_RD)) )
+ break;
+ }
+done:
+ if ( rec )
+ ret = rec->len;
+ouch:
+ if ( b )
+ ULK( b );
+ return ret;
+} /* lbt_search */
+
+
+int lbt_init ( Idx *bt )
+{
+ int ret = 0, blkbits, i;
+ Block *l;
+ int size = lio_size( bt->fd );
+
+ bt->root = 0;
+ bt->hash = 0;
+ bt->mem = 0;
+ for ( i=LBT_LRULEV; i--; )
+ bt->lru[i] = bt->mru[i] = 0;
+
+ if ( 0x3ff & size ) /* not on 1K boundary */
+ return log_msg( LOG_ERROR, "bt has bad size %dK + %d",
+ size >> 10, (int)size & 0x3ff );
+ if ( size ) { /* read root header */
+ Block b;
+ unsigned want = sizeof(b) - LBT_BOFF;
+ int got = lio_pread( &bt->fd, LBT_BASE(&b), want, 0 );
+ if ( want != (unsigned)got )
+ return log_msg( LOG_SYSERR, "could not get root head" );
+ if ( b.num )
+ return log_msg( ERR_TRASH, "root has num %d", b.num );
+ /*if ( LBT_ISIS != b.nxt ) log_msg( LOG_WARN, "magic is 0x%08x", b.nxt );*/
+ if ( bt->typ && bt->typ != b.typ )
+ log_msg( LOG_WARN, "typ want 0x02%x got 0x02%x", bt->typ, b.typ );
+ bt->typ = b.typ;
+ if ( bt->key && bt->key != b.key )
+ log_msg( LOG_WARN, "key want %d got %d", bt->key, b.key );
+ bt->key = b.key;
+ if ( bt->col && bt->col != b.col )
+ log_msg( LOG_WARN, "col want %d got %d", bt->col, b.col );
+ bt->col = b.col;
+ bt->dpt = b.lev;
+ /*
+ b.ent
+ b.stk
+ */
+ } else { /* new index */
+ bt->dpt = 1;
+ if ( ! bt->key )
+ bt->key = 30;
+ }
+
+ bt->vsz = 8+(0xf & bt->typ);
+ blkbits = 10 + (3 & (bt->typ >> 4));
+ bt->bsz = 1 << blkbits;
+ if ( size % bt->bsz )
+ return log_msg( ERR_TRASH, "size 0x%08x % blksize", size, bt->bsz );
+ bt->len = size >> blkbits;
+ if ( 2 > bt->len && !(LBT_WRITE & bt->flg) )
+ return log_msg( LOG_SYSERR, "attempt to read empty index" );
+
+ bt->hlen = 16 + bt->len / 32; /* initial cache size */
+ if ( bt->hlen < 1000 ) bt->hlen = 1000;
+ if ( !(bt->root = addchunk( bt, bt->hlen )) )
+ return log_msg( LOG_SYSERR, "\twhen getting bt cache" );
+ if ( bt->hlen < 256 ) /* the hash */
+ bt->hlen = 256;
+ if ( !(bt->hash = (Block**)mAlloc( bt->hlen * sizeof(Block*) )) )
+ return log_msg( LOG_SYSERR, "\twhen getting bt hash" );
+ memset( bt->hash, 0, bt->hlen * sizeof(Block*) );
+
+ if ( size && readblk( bt->root, bt, 0 ) )
+ return log_msg( ERR_TRASH, "could not read root" );
+ if ( ! bt->len ) {
+ /* init: link root to leaf */
+ SETINT( LBT_BASE(bt->root)+(bt->root->stk -= 4), 1 );
+ LBT_PVK( bt->root->dct, bt->root->stk, 0, 0 );
+ bt->root->ent = 1;
+ bt->len = 1;
+ }
+ if ( !(l = getblk( bt, 1, 0, LBT_EX )) )
+ return log_msg( ERR_TRASH, "could not read 1st leaf" );
+ if ( ! size ) {
+ log_msg( LOG_INFO, "creating index bsz %u ksz %u vsz %u",
+ bt->bsz, bt->key, bt->vsz );
+ bt->root->lev = 1;
+ if ( WRULK( bt->root, bt ) || WRULK( l, bt ) )
+ return log_msg( LOG_SYSERR, "\twhen writing base" );
+ }
+
+ return ret;
+} /* lbt_init */
+
+
+void lbt_close ( Idx *bt )
+{
+ Chunk *c, *n = bt->mem;
+ while ( (c = n) ) {
+ n = c->nxt;
+ mFree( c );
+ }
+ mFree( bt->hash );
+ lio_close( &bt->fd, LIO_INOUT );
+ memset( bt, 0, sizeof(*bt) );
+} /* lbt_close */
+
+
+/* ************************************************************
+ public functions
+*/
+
+
+void cXMkVal ( Idx *bt, Val *val, Hit *hit )
+{
+ if ( hit ) {
+ val->len = bt->vsz;
+ mkval( val, hit );
+ } else
+ val->len = 0;
+}
+
+int cXAdd ( Idx *bt, Key *key, Hit *hit )
+{
+ if ( bt->bat ) {
+ if ( ! key )
+ key = &bt->bat->key;
+ cXMkVal( bt, &key->val, hit );
+ return lbt_batchval( bt, key );
+ } else if ( !key )
+ return 0;
+ else if ( hit ) {
+ cXMkVal( bt, &key->val, hit );
+ if ( hit->dbn == 0xffff )
+ return lbt_del( bt, key );
+ }
+ return lbt_add( bt, key );
+} /* cXAdd */