2 openisis - an open implementation of the CDS/ISIS database
3 Version 0.8.x (patchlevel see file Version)
4 Copyright (C) 2001-2003 by Erik Grziwotz, erik@openisis.org
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2.1 of the License, or (at your option) any later version.
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 see README for more information
24 $Id: lbt.c,v 1.35 2003/05/27 11:03:30 kripke Exp $
32 #if defined( __GNUC__ ) && defined ( alloca )
36 #if defined( sparc ) || defined( __ppc__ )
37 # define LDB_BIG_ENDIAN
46 probably all relevant cases can be handled by the more
47 efficient approach of storing recoded key a la strxfrm
52 /* ************************************************************
57 /* little endian magic text */
58 LBT_OPEN = 'O' | 'P'<<8 | 'E'<<16 | 'N'<<24,
59 LBT_ISIS = 'I' | 'S'<<8 | 'I'<<16 | 'S'<<24
64 LBT_LEVELS = 16, /* max depth */
65 LBT_POSBITS = 13, /* bits for position within block */
67 LBT_POSMAX = LBT_POSMASK /* obviously :) */
70 typedef struct Dict { /* dictionary of block entries */
71 unsigned short pos; /* 13 bits position of entry, 2 bits val for leaves */
72 unsigned char vln; /* length of entry's value */
73 /* for inner nodes, this is the length of qualifying
74 second key segment (either 0 or bt->vsz <= 23).
75 for leaves, this, together with higher bits of pos,
76 is the number of values of bt->vsz each.
78 unsigned char kln; /* length of entry's key */
80 the following access macros are needed for leaves only
82 #define LBT_POS( d ) (LBT_POSMASK & (d)->pos)
83 #define LBT_VLN( d ) ((d)->vln | (~LBT_POSMASK & (d)->pos)>>(LBT_POSBITS-8))
84 #define LBT_POSADD( d, a ) do { \
85 (d)->pos = (~LBT_POSMASK & (d)->pos) | (LBT_POSMASK & ((d)->pos + (a))); \
87 #define LBT_VLNADD( d, a ) do { \
88 unsigned _vln = LBT_VLN( d ) + (a); \
89 (d)->pos = (LBT_POSMASK & (d)->pos) \
90 | (~LBT_POSMASK & ((_vln)<<(LBT_POSBITS-8))); \
91 (d)->vln = (unsigned char)(_vln); \
93 #define LBT_PV( d, p, v ) do { \
94 (d)->pos = (LBT_POSMASK & (p)) | (~LBT_POSMASK & ((v)<<(LBT_POSBITS-8))); \
95 (d)->vln = (unsigned char)(v); \
97 #define LBT_PVK( d, p, v, k ) do { \
99 (d)->kln = (unsigned char)(k); \
104 /* guard space around block */
106 # define LBT_GUARD 64
107 /* template for guard space -- blank or pattern */
108 static unsigned char guard[LBT_GUARD];
115 TODO: check the possible performance gain in findkey
116 from separating disk block and administrative data
117 and aligning disk blocks on PAGE boundaries in memory
119 typedef struct Block {
121 unsigned char grd[LBT_GUARD];
123 unsigned char ird; /* is reading */
124 unsigned char wrd; /* #threads waiting for read */
125 unsigned char lck; /* is write locked */
126 unsigned char wlk; /* #threads waiting for write lock */
127 #define LBT_BUSY( b ) (*(int*)&(b)->ird) /* any of 1st 4 bytes != 0 */
128 struct Block *nhs; /* next in hash chain */
129 struct Block *nru; /* next (more) recently used */
130 /* TODO: possibly double-link lru list, so we can unlink on every access ? */
131 /* the disk block -- from here on bsz bytes are on disk */
132 #define LBT_BOFF ((char *)&((Block*)0)->num - (char*)0)
133 #define LBT_BASE( b ) ((unsigned char *)&(b)->num)
134 unsigned num; /* number of this block, 0 is root */
135 unsigned nxt; /* right sibling, "OPEN" or "ISIS" for root */
136 /* typ,key,col are redundant in non-root blocks, maybe abused ... */
137 unsigned char typ; /* type: bsz, vsz, flags */
138 unsigned char key; /* max key length */
139 unsigned char col; /* collation */
140 unsigned char lev; /* level over bottom, 0 for leaves */
141 unsigned short ent; /* 10 bits # entries */
142 #define LBT_ENT( b ) (b)->ent /* currently other bits arent abused */
143 unsigned short stk; /* 13 bits offset of keyspace stack */
144 /* offset of dict RELATIVE to base */
145 #define LBT_DOFF ((char *)&((Block*)0)->dct - (char*)&((Block*)0)->num)
151 dct is an array of dictionary entries in increasing key order
152 stk is growing downwards from block end towards dict
153 position of entry i is b->stk <= LBT_POS(b->dct+i) < bt->bsz
154 decreasing on i (i.e. stack space is in reverse ordering)
156 at LBT_POS(b->dct+i), there are b->dct[i].kln bytes key
157 for a leaf entry, the key is followed by
158 LBT_VLN(b->dct+i) (between 0..1023 incl.) values
159 of bt->vsz (between 8..23 incl) each
160 for an inner entry, the key is followed by
161 4 bytes block number and optionally bt->vsz qualifying value
165 block base overhead LBT_DOFF is 16 bytes
167 Dict entry is 4 bytes
168 max. used stack space per entry is:
169 inner node: kln+4+bt->vsz <= 255+4+23 = 282
170 new leaf entry (one value): kln+bt->vsz <= 278
171 max total space for a new entry incl. dict is 286 < avail size/3
174 typedef struct Chunk {
180 /* number of levels for lru list */
181 LBT_LRULEV = sizeof(((Idx*)0)->lru)/sizeof(((Idx*)0)->lru[0]),
182 LBT_ENTBITS = 10, /* bits for # enries */
183 LBT_ENTMASK = 0x03ff,
184 LBT_ENTMAX = LBT_ENTMASK,
185 LBT_MATCH = 0x8000, /* entry matched */
186 LBT_ENTSIZE = 8 /* minimum cost of entry: 4 byte dict, 4 byte val */
190 typedef struct Batch {
192 unsigned got; /* current for this key */
193 unsigned tot; /* total for this key */
210 LBT_RD, /* read -- no lock */
212 LBT_EX /* exclusive */
216 typedef struct { /* structure filled by findkey */
217 Idx *bt; /* the tree */
218 Key *key; /* the key searched */
219 Block *b[LBT_LEVELS]; /* path to leaf, b[0] is leaf */
220 unsigned n[LBT_LEVELS]; /* block numbers of path */
221 unsigned short e[LBT_LEVELS]; /* entries in path */
232 /* buffer to print a value to.
233 lowest 11 bytes are printed as 4 shorts + one int < 4*5+10 = 30 digits+5sep
234 higher 12 bytes are printed hex = 24 bytes.
236 typedef char PrVal[64];
239 /* ************************************************************
244 stat access is not guarded by memory barrier.
245 may be wrong when running on multiple processors
250 /* ************************************************************
255 static unsigned GETINT ( void *m ) /* bus error safe version of *(int*) */
261 static void SETINT ( void *m, unsigned i )
266 #define GETINT( m ) (*(unsigned*)(m))
267 #define SETINT( m, i ) (*(unsigned*)(m) = (i))
270 static void mkval ( Val *v, Hit *ifp )
272 unsigned char *c = v->byt;
274 default: memset( c, 0, v->len-11 ); c += v->len-11;
275 case 11: *c++ = (unsigned char) (ifp->dbn >> 8);
276 case 10: *c++ = (unsigned char) ifp->dbn;
277 case 9: *c++ = (unsigned char) (ifp->mfn >> 24);
278 case 8: *c++ = (unsigned char) (ifp->mfn >> 16);
279 case 7: *c++ = (unsigned char) (ifp->mfn >> 8);
280 case 6: *c++ = (unsigned char) ifp->mfn;
281 case 5: *c++ = (unsigned char) (ifp->tag >> 8);
282 case 4: *c++ = (unsigned char) ifp->tag;
283 case 3: *c++ = (unsigned char) ifp->occ;
284 case 2: *c++ = (unsigned char) (ifp->pos >> 8);
285 case 1: *c++ = (unsigned char) ifp->pos;
291 static Hit *rdval ( Hit *ifp, unsigned char *c, unsigned vsz )
293 ifp->mfn = ifp->tag = ifp->occ = ifp->pos = ifp->dbn = 0;
295 default: c += vsz-11;
296 case 11: ifp->dbn = (unsigned short)*c++ << 8;
297 case 10: ifp->dbn |= *c++;
298 case 9: ifp->mfn = (unsigned int)*c++ << 24;
299 case 8: ifp->mfn |= (unsigned int)*c++ << 16;
300 case 7: ifp->mfn |= (unsigned int)*c++ << 8;
301 case 6: ifp->mfn |= *c++;
302 case 5: ifp->tag = (unsigned short)*c++ << 8;
303 case 4: ifp->tag |= *c++;
304 case 3: ifp->occ = *c++;
305 case 2: ifp->pos = (unsigned short)*c++ << 8;
306 case 1: ifp->pos |= *c++;
313 static char *prval ( PrVal buf, unsigned char *c, unsigned vsz )
325 while ( 11 < vsz-- ) {
326 *d++ = luti_hex[ *c >> 4 ];
327 *d++ = luti_hex[ *c++ & 0xf ];
332 d += u2a( d, h.dbn ); *d++ = '.';
333 d += u2a( d, h.mfn ); *d++ = '.';
334 d += u2a( d, h.tag ); *d++ = '.';
335 d += u2a( d, h.occ ); *d++ = '.';
336 d += u2a( d, h.pos ); *d++ = '.';
341 /* for PRVAL f->key->val */
342 #define PRVAL( buf, v ) prval( buf, v.byt, v.len )
345 /* ************************************************************
346 block I/O & cache mgmt
349 static void iniblk ( Block *b, Idx *bt, unsigned num, unsigned char lev )
351 memset( b, 0, LBT_BOFF + bt->bsz + LBT_GUARD );
361 static int chkblk ( Block *b, Idx *bt )
367 unsigned short eod = LBT_DOFF + b->ent*sizeof(Dict), siz, pos, vln, key;
368 unsigned end = bt->bsz, tot = 0;
369 unsigned char *base = LBT_BASE(b);
371 if ( memcmp( guard, b->grd, LBT_GUARD ) )
372 return log_msg( ERR_TRASH, "OOPS! initial guard frobbed on blk %d", b->num );
373 if ( memcmp( guard, LBT_BASE(b)+bt->bsz, LBT_GUARD ) )
374 return log_msg( ERR_TRASH, "OOPS! initial guard frobbed on blk %d", b->num );
376 if ( b->num > bt->len || (b->num && b->nxt > bt->len) )
377 return log_msg( ERR_TRASH,
378 "bad num %d / nxt %d on lev %d", b->num, b->nxt, b->lev );
379 if ( b->typ != bt->typ || b->key != bt->key || b->col != bt->col )
380 return log_msg( ERR_TRASH,
381 "bad sig 0x%02x,%d,%d in block %d want 0x%02x,%d,%d",
382 b->typ, b->key, b->col, b->num, bt->typ, bt->key, bt->col );
383 if ( b->ent > bt->bsz / LBT_ENTSIZE || b->stk > bt->bsz || b->stk < eod )
384 return log_msg( ERR_TRASH,
385 "bad ent/stk %d,%d in block %d eod %d sz %d",
386 b->ent, b->stk, b->num, eod, bt->bsz );
387 for ( i=0; i<b->ent; i++ ) {
388 Dict *d = &b->dct[i];
392 siz = key + (b->lev ? vln+4u : vln * bt->vsz);
393 if ( pos < b->stk || end != (unsigned)pos + siz )
394 return log_msg( ERR_TRASH,
395 "bad pos/vln/key %d/%d/%d in entry %d of block %d"
396 " stack %d sz %d end %d expected %d",
397 pos, vln, key, i, b->num, b->stk, siz, pos+siz, end );
400 if ( vln && vln != +bt->vsz )
401 return log_msg( ERR_TRASH,
402 "bad vln %d in entry %d of block %d vsz %d",
403 vln, i, b->num, bt->vsz );
404 lnk = GETINT( base + pos + key + vln );
405 if ( lnk >= bt->len )
406 return log_msg( ERR_TRASH,
407 "bad link %u in entry %d of block %d",
410 #if 1 && ! defined( NDEBUG )
412 unsigned short op = LBT_POS( d-1 ), okl = d[-1].kln;
413 int mc = memcmp( base+pos, base+op, key<okl ? key : okl ), j;
414 if ( 0 < mc || (!mc && 0 < (mc = (int)key - (int)okl)) )
417 if ( !mc && b->lev && vln ) { /* compare values */
418 unsigned ovln = d[-1].vln;
419 if ( ! ovln || 0 < (j = memcmp( base+pos+key, base+op+okl, vln )) )
422 return log_msg( ERR_TRASH,
423 "bad key order entry %d of block %d lev %d"
424 " mc %d j %d key %d okl %d vln %d",
425 i, b->num, b->lev, mc, j, key, okl, vln
433 if ( tot > bt->bsz - b->stk )
434 return log_msg( ERR_TRASH,
435 "bad total entry size %d in block %d stack %d",
436 tot, b->num, b->stk );
438 if ( LOG_DO( LOG_TRACE ) ) {
440 "block %d lev %d sig 0x%02x,%d,%d ent %d eod %d stk %d",
441 b->num, b->lev, b->typ, b->key, b->col, b->ent, eod, b->stk
443 for ( i=0; i<2; i++ ) {
444 int j = i ? b->ent-1 : 0;
445 Dict *d = &b->dct[j];
446 if ( i >= b->ent ) /* if b->ent is 0 or 1, log no or one key */
451 end = pos + b->lev ? vln+4u : vln * bt->vsz;
454 log_msg( LOG_TRACE, "key %d '%.*s'/%d -> %d",
455 j, key, base+pos, vln, GETINT(base+pos+key+vln) );
458 rdval( &h, base+pos+key, bt->vsz );
459 log_msg( LOG_TRACE, "key %d '%.*s'[%d] = %d[%d,%d,%d]",
460 j, key, base+pos, vln, h.mfn, h.tag, h.occ, h.pos );
470 static Block *addchunk ( Idx *bt, unsigned cnt )
472 unsigned ibsz = LBT_BOFF + bt->bsz + LBT_GUARD; /* internal block size */
473 /* increase cnt so it's > 0 and add one extra to return */
474 char *mem = (char *)mAlloc( sizeof(Chunk) + (1 + ++cnt)*ibsz );
475 Chunk *chk = (Chunk *)mem;
478 /* chain the chunk */
481 mem += sizeof(Chunk);
483 /* add cnt blocks to level 0 lru list */
484 if ( ! bt->mru[0] ) /* level 0 cache is empty */
485 bt->mru[0] = (Block *)mem;
486 for ( ; cnt--; mem += ibsz ) {
487 Block *b = (Block *)mem;
488 iniblk( b, bt, 0 , 0 ); /* initialise empty block */
492 iniblk( (Block *)mem, bt, 0 , 0 );
497 static int readblk ( Block *b, Idx *bt, unsigned num )
499 int got = lio_pread( &bt->fd, LBT_BASE(b), bt->bsz, num*bt->bsz );
500 log_msg( LOG_VERBOSE, "reading block %u got %u", num, got );
501 if ( bt->bsz != (unsigned)got )
502 return log_msg( LOG_SYSERR, "\twhen reading bt block %d", num );
504 return log_msg( ERR_TRASH, "bad num %d in block %d", b->num, num );
505 return chkblk( b, bt );
509 static void unlock ( Block *b )
513 (void)LIO_WAKE( b->num );
516 #define ULK( b ) do { \
517 if ( (b)->lck ) { (b)->lck = 0; if ( b->wlk ) unlock( b ); } \
521 static int writeblk ( Block *b, Idx *bt, int ulk )
524 if ( chkblk( b, bt ) ) /* don't write no shize */
525 return log_msg( ERR_TRASH, "\tbefore writing bt block %d", b->num );
526 got = lio_pwrite( &bt->fd, LBT_BASE(b), bt->bsz, b->num*bt->bsz );
527 log_msg( LOG_VERBOSE, "writing block %u got %u", b->num, got );
530 if ( bt->bsz != (unsigned)got )
531 return log_msg( LOG_SYSERR, "\twhen writing bt block %d", b->num );
535 #define WRULK( b, bt ) writeblk( b, bt, 1 )
538 #define NEWBLK( bt, lev, lock ) getblk( bt, bt->len, lev, lock )
541 1) locate block #num in cache.
543 if it is READING, or we want to lock and it is locked,
544 incr # waiters, wait for it.
545 3) if it is not found:
546 locate a free cache entry, extending the cache if necessary,
547 set the num, set the READING flag, go read it.
548 after reading, if there are waiters, wake them up.
549 since we are the first to get a hold on the block,
550 we may write lock it, if we want.
553 static Block *getblk ( Idx *bt, unsigned num,
554 unsigned char lev, lbt_get lock )
557 stat.get++; /* dirty */
558 if ( !(stat.get & 0x3fff) ) { /* every 16K */
559 static unsigned lmiss;
561 "searched %i keys in %u blks cache %d %d%%"
562 " depth %u cmp/get/miss %u/%u/%u %d%% (%d %d%%)",
563 stat.key, bt->len, bt->clen, bt->clen*100 / bt->len,
564 bt->root->lev, stat.cmp, stat.get,
565 stat.miss, stat.miss*100 / stat.get,
566 (stat.miss - lmiss), (stat.miss - lmiss)*100 / 0x4000
570 startover: /* only used on absurd congestion */
572 LOG_DBG( LOG_DEBUG, "get bt block %u %u %d", num, lev, lock );
575 else if ( num < bt->len ) {
576 b = bt->hash[ num % bt->hlen ];
577 while ( b && num != b->num )
581 while ( b->ird || (lock && b->lck) ) {
583 int wlk = lock && b->lck;
584 LOG_DBG( LOG_VERBOSE, "lck on block %u %u r%d l%d",
585 num, lev, b->ird, b->lck );
586 if ( LBT_EX <= lock ) {
587 log_msg( LOG_ERROR, "concurrent mod on EX lock" );
591 log_msg( LOG_FATAL, "bad MT environment: concurrent mod, no lock" );
594 if ( wrd && 0xff != b->wrd )
596 if ( wlk && 0xff != b->wlk )
598 (void)LIO_WAIT( num ); /* zzzzz .... sleep tight */
599 /* some hours later */
600 if ( num != b->num ) {
601 log_msg( LOG_WARN, "heavy congestion: block %d has gone", num );
609 else { /* not found or new block */
610 /* find free block */
611 int i = 0, llev, err = 0;
612 stat.miss++; /* dirty */
613 LOG_DBG( LOG_DEBUG, "bt block %u %u not found", num, lev );
614 for ( ; i < (int)LBT_LRULEV; i++ ) {
615 /* check lru list for level i */
616 Block **bp = &bt->lru[i];
617 while ( (b = *bp) && LBT_BUSY(b) )
619 if ( ! b ) /* try next level */
621 *bp = b->nru; /* dequeue */
622 if ( bt->mru[i] == b ) /* was the tail */
627 log_msg( LOG_WARN, "no free blocks in cache" );
628 if ( !(b = addchunk( bt, 64 )) )
630 /* TODO: should we also extend the hash ? */
631 } else if ( b->num ) { /* got block -- should be hashed somewhere */
632 Block **bp = &bt->hash[ b->num % bt->hlen ];
633 while ( *bp && *bp != b )
636 *bp = b->nhs; /* dehash */
637 /* else ... hmmm should have found it */
639 assert( ! LBT_BUSY( b ) );
640 iniblk( b, bt, num, lev );
641 /* hash it, so waiters can find it */
642 b->nhs = bt->hash[ b->num % bt->hlen ];
643 bt->hash[ b->num % bt->hlen ] = b;
644 if ( num == bt->len ) { /* extend */
647 /* TODO: should be configurable */
648 if ( bt->len > bt->clen << 5 && bt->clen < 0x4000 )
649 addchunk( bt, bt->clen / 4 );
653 err = readblk( b, bt, num );
656 if ( ! err && lev != b->lev )
657 err = log_msg( ERR_TRASH, "bad lev %d in block %d want %d",
658 b->lev, b->num, lev );
660 Block **bp = &bt->hash[ num % bt->hlen ];
661 while ( *bp && *bp != b )
664 *bp = b->nhs; /* dehash */
665 b->lev = 0; /* add to level 0 lru list */
666 b->num = 0; /* tell others that it isn't the expected block */
669 /* add to it's levels lru list */
670 llev = LBT_LRULEV > b->lev ? b->lev : LBT_LRULEV-1;
671 if ( bt->mru[ llev ] )
672 bt->mru[ llev ]->nru = b;
675 (bt->mru[ llev ] = b)->nru = 0;
677 (void)LIO_WAKE( num );
679 b->wrd = b->wlk = 0; /* mark block unused */
682 } /* got free block */
683 if ( LBT_WR == lock ) {
685 log_msg( LOG_FATAL, "bt consi: unexpected lock on %u", num );
694 /** binsearch position for key.
695 position is either the index of the matching entry (0..b->ent-1)
696 or the index (0..b->ent) where key should be inserted.
697 @return position for key (0..b->ent), |ifyed with LBT_MATCH, if found
699 static unsigned short keypos (
702 #define KEYPOS( bt, b, key ) keypos( bt, b, key )
704 #define KEYPOS( bt, b, key ) keypos( b, key )
708 unsigned char *base = LBT_BASE( b );
709 unsigned char *kbt = key->byt;
710 unsigned char kln = key->len;
711 unsigned short l = 0, r = b->ent;
713 /* binsearch invariant:
714 key is < all entries i >= r (we do know relation at r)
715 key is > all entries i < l (don't know relation at l)
717 while ( l < r ) { /* invariant: l <= keypos < r */
718 unsigned short i = (l + r) >> 1; /* l <= i < r */
719 unsigned short pos = LBT_POS(&d[i]);
720 unsigned char cln = d[i].kln;
723 bt->cmp ? bt->cmp( kbt, base+pos, kln<cln ? kln : cln ) :
725 memcmp( kbt, base+pos, kln<cln ? kln : cln );
726 stat.cmp++; /* dirty */
727 if ( ! cmp && !(cmp = (int)kln - cln) ) {
728 /* match on main key -- what about the value ? */
729 if ( ! b->lev /* ignore on leaves -- no duplicate keys here */
730 || (!d[i].vln && !key->val.len) /* no value */
732 return LBT_MATCH | i;
733 /* so we're on an inner node */
734 if ( ! d[i].vln ) /* common case */
736 else if ( ! key->val.len )
739 unsigned char vln = LBT_VLN(&d[i]);
740 int c = key->val.len < d[i].vln ? key->val.len : vln;
741 if ( !(cmp = memcmp( key->val.byt, base+pos+cln, c ))
742 && !(cmp = (int) key->val.len - vln)
744 return LBT_MATCH | i;
756 /** find position for (leaf) val.
757 search v in array b of n values of size sz
758 @return position for value (0..n), |ifyed with LBT_MATCH, if found
760 static unsigned short valpos ( unsigned char *b,
761 unsigned short n, unsigned char sz, unsigned char *v )
763 unsigned short l = 0, r = n;
764 /* binsearch invariant:
765 v is < all values i >= r (we do know relation at r)
766 v is > all values i < l (don't know relation at l)
768 while ( l < r ) { /* invariant: l <= valpos < r */
769 unsigned short i = (l + r) >> 1; /* l <= i < r */
770 int cmp = memcmp( v, b+i*sz, sz );
772 return LBT_MATCH | i;
783 static void TRACEKEY ( unsigned lev, FindKey *f )
790 if ( LOG_DONT( lev ) )
792 log_msg( lev, "key '%.*s' vln %d val %s",
793 f->key->len, f->key->byt, f->key->val.len,
794 PRVAL( pv, f->key->val ) );
795 if ( f->key->val.len ) {
796 rdval( &h, f->key->val.byt, f->key->val.len );
797 log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d",
798 h.dbn, h.mfn, h.tag, h.occ, h.pos );
800 for ( i=LBT_LEVELS; i--; ) {
806 d = b->dct + (e = (LBT_ENTMASK & f->e[i]));
808 n = GETINT( LBT_BASE(b)+d->pos+d->kln+d->vln );
809 log_msg( lev, "lev %d n %d e 0x%x = %d %c '%.*s' val %d %s -> %d%s",
810 i, f->n[i], f->e[i], e, (LBT_MATCH & f->e[i]) ? 'M' : '-',
811 d->kln, LBT_BASE(b)+d->pos, d->vln,
812 prval( pv, LBT_BASE(b)+d->pos+d->kln, d->vln ),
813 n, on == b->num ? "" : " LY!"
816 rdval( &h, LBT_BASE(b)+d->pos+d->kln, d->vln );
817 log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d",
818 h.dbn, h.mfn, h.tag, h.occ, h.pos );
822 log_msg( lev, "\tleft '%.*s' val %d %s",
823 d->kln, LBT_BASE(b)+d->pos, d->vln,
824 prval( pv, LBT_BASE(b)+d->pos+d->kln, d->vln )
827 rdval( &h, LBT_BASE(b)+d->pos+d->kln, d->vln );
828 log_msg( lev, "\tdb %d mfn %d tag %d occ %d pos %d",
829 h.dbn, h.mfn, h.tag, h.occ, h.pos );
835 #define TRACEKEY( l, f ) (void)(f)
839 static int findkey ( FindKey *f, lbt_get lock )
842 Block *b = f->bt->root;
843 unsigned char *pkey = 0, pkln = 0, pvln = 0;
844 if ( b->lev >= LBT_LEVELS )
845 return log_msg( ERR_TRASH, "tree is too deep: %u", b->lev );
847 for ( i=LBT_LEVELS; i--; ) { /* paranoia */
849 f->n[i] = f->e[i] = 0;
858 f->n[i] = (f->b[i] = b)->num;
859 f->e[i] = e = KEYPOS( f->bt, b, f->key );
860 LOG_DBG( LOG_DEBUG, "keypos %x", e );
862 if ( pkey && b->ent ) {
863 /* compare parent key against our first key
864 pkey must be equal for inner node and not greater for leave
866 int cmp; /* this - pkey >= 0 */
867 unsigned char *ent, ln;
869 ent = LBT_BASE(b)+LBT_POS(d);
870 ln = pkln < d->kln ? pkln : d->kln;
871 cmp = ln ? memcmp( ent, pkey, ln ) : 0;
873 cmp = (int)d->kln - (int)pkln;
875 if ( ! d->vln || ! pvln )
876 cmp = (int)d->vln - (int)pvln;
878 cmp = memcmp( ent+d->kln, pkey+pkln, f->bt->vsz );
880 if ( cmp && (i || 0 > cmp) ) {
883 TRACEKEY( ERR_TRASH, f );
885 rdval( &h, ent+d->kln, d->vln );
886 log_msg( ERR_TRASH, "\tfirst val db %d mfn %d tag %d occ %d pos %d",
887 h.dbn, h.mfn, h.tag, h.occ, h.pos );
889 return log_msg( ERR_TRASH, "\tbad first '%.*s' val %d %s",
890 d->kln, ent, d->vln, prval( pv, ent+d->kln, d->vln ) );
895 here goes the Lehmann/Yao race detection:
896 If we ran on right boundary, follow right link
898 while ( !(LBT_MATCH & e) && b->ent == e && b->nxt && b->num ) {
901 b->lck = LBT_WR; /* hold b while checking s */
902 s = getblk( f->bt, b->nxt, i, i ? LBT_RD : lock );
907 se = KEYPOS( f->bt, s, f->key );
908 if ( !se ) { /* lower than first */
912 LOG_DBG( LOG_DEBUG, "LYao %d -> %d lev %d for '%.*s' got 0x%x",
913 b->num, s->num, i, f->key->len, f->key->byt, se );
916 f->n[i] = (f->b[i] = b = s)->num;
922 if ( !(LBT_MATCH & e) ) {
926 TRACEKEY( ERR_TRASH, f );
927 return log_msg( ERR_IDIOT,
928 "ran on left bound of block %u ent %u",
932 d = b->dct + (LBT_ENTMASK & e);
933 /* direct access ok in inner node */
934 pkey = LBT_BASE(b)+d->pos;
937 n = GETINT( pkey+pkln+pvln );
939 b = getblk( f->bt, n, i, i ? LBT_RD : lock );
941 return -1; /* go away */
945 static int mktree ( Idx *bt )
948 unsigned char lev = 0; /* the level to be indexed */
949 unsigned left = 1; /* block on child level */
951 unsigned size = bt->len * bt->bsz;
952 unsigned sval = ba->vals * bt->vsz;
953 unsigned sove = bt->len * LBT_DOFF + sizeof(Dict)*(ba->keys+ba->span);
962 "\t#blks %u total len %u free %u (%.1f%%) overhead %u (%.1f%%)\n"
963 "\t#keys %u total len %u (%.1f%%) avg %.1f max %u\n"
964 "\t#vals %u total len %u (%.1f%%) avg %.1f max %u spans %u",
965 bt->len, size, ba->free, ba->free*100./size, sove, sove*100./size,
966 ba->keys, ba->keyl, ba->keyl*100./size, (float)ba->keyl/ba->keys, ba->maxl,
967 ba->vals, sval, sval*100./size, (float)ba->vals/ba->keys, ba->maxv, ba->span
971 for (;;) { /* levels */
972 unsigned len = 0; /* current is n'th block on level */
974 unsigned have = 0; /* room in current block b */
975 unsigned char*base = 0; /* of current block b */
976 Dict *d = 0; /* of current block b */
977 unsigned next = left; /* child */
980 /* get child c and create child pointer in block b */
982 unsigned char vln = 0;
986 Block *c = getblk( bt, next, lev, LBT_EX );
988 return log_msg( ERR_NOMEM, "no child block" );
990 if ( ! c->ent ) /* OOPS empty node */
992 key = LBT_BASE(c)+LBT_POS(c->dct);
994 if ( ! b ) /* the leftmost child on this level */
995 kln = 0; /* always 0 key on left */
997 vln = LBT_VLN(c->dct);
998 else if ( ba->key.len == kln /* compare key to last of prev. */
999 && !memcmp( ba->key.byt, key, kln ) /* which was saved in ba->key */
1000 ) { /* key spans blocks */
1001 LOG_DBG( LOG_DEBUG, "found span key '%.*s' child %u",
1003 vln = bt->vsz; /* qualify with value */
1006 unsigned char x = 0;
1007 /* find prefix key */
1008 if ( kln > ba->key.len )
1010 while ( x < kln && key[x] == ba->key.byt[x] )
1012 if ( x < kln ) /* they differ at [x] */
1014 else /* equal on the shorter length */
1016 if ( kln > c->dct[0].kln )
1017 return log_msg( ERR_TRASH,
1018 "bad key '%.*s' on level %u matching previous of length %u",
1019 c->dct[0].kln, key, lev, ba->key.len );
1020 LOG_DBG( LOG_DEBUG, "prefix on key '%.*s' child %u save %u",
1021 kln, key, c->num, c->dct[0].kln - kln );
1022 ipfx += c->dct[0].kln - kln;
1028 need = sizeof(Dict) + kln + vln + 4;
1029 if ( need > have ) {
1031 log_msg( LOG_VERBOSE,
1032 "level %u block %u is full %u ent stack %u",
1033 lev, b->num, b->ent, b->stk );
1034 b->nxt = bt->len; /* link */
1036 if ( WRULK( b, bt ) ) goto nowrite;
1039 c->lck = LBT_EX; /* paranoia */
1040 b = NEWBLK( bt, lev+1, LBT_EX );
1043 return log_msg( ERR_NOMEM, "no free block" );
1044 if ( b->ent || b->stk != bt->bsz )
1045 return log_msg( ERR_TRASH,
1046 "new block %u is not empty: ent %u stk %u",
1047 b->num, b->ent, b->stk );
1048 base = LBT_BASE( b );
1049 b->lck = LBT_EX; /* lock */
1050 have = bt->bsz - LBT_DOFF; /* - ba->let; TODO: should we ??? */
1055 SETINT( base+(b->stk -= 4), c->num );
1058 if ( kln += vln ) /* key + consecutive qualifying value */
1059 memcpy( base+(b->stk -= kln), key, kln );
1065 /* save the last dance for me */
1066 memcpy( ba->key.byt, LBT_BASE(c)+LBT_POS(&c->dct[c->ent-1]),
1067 ba->key.len = c->dct[c->ent-1].kln );
1070 return log_msg( ERR_TRASH, "level %u was empty", lev );
1072 log_msg( LOG_INFO, "closing level %u with %u blocks %u..%u",
1073 lev, len, left, b->num );
1076 memcpy( bt->root, b, LBT_BOFF + bt->bsz );
1080 if ( WRULK( b, bt ) ) goto nowrite;
1086 "\tused %u inner blocks %u keys %u bytes avg %.1f\n"
1087 "\t%u #spans (%.1f%%) %u pfx (- %.1f%%)",
1088 iblk, ikey, ikeyl, (float)ikeyl/ikey,
1089 ispn, ispn*100.*bt->vsz/ikeyl, ipfx, ipfx*100./(ipfx+ikeyl) );
1095 return log_msg( LOG_SYSERR, "\twhen writing batch block" );
1100 /* ************************************************************
1106 /* ************************************************************
1110 int lbt_batch ( Idx *bt, unsigned char pctfree )
1112 bt->bat = (Batch*)mAlloc( sizeof(Batch) );
1114 return log_msg( ERR_NOMEM, "\twhen getting batch" );
1115 memset( bt->bat, 0, sizeof(Batch) );
1118 bt->bat->let = pctfree * bt->bsz / 100;
1119 lio_trunc( &bt->fd, 0 );
1120 iniblk( bt->root, bt, 0, 0 );
1121 /* TODO: kill hash */
1123 bt->bat->cur = NEWBLK( bt, 0, LBT_EX );
1128 int lbt_batchval ( Idx *bt, Key *key )
1131 Batch *ba = bt->bat;
1132 if ( key != &ba->key ) {
1133 int l = key->len < ba->key.len ? key->len : ba->key.len; /* min */
1134 if ( ! (cmp = memcmp(key->byt, ba->key.byt, l)) )
1135 cmp = (int)key->len - (int)ba->key.len;
1137 return log_msg( LOG_ERROR, "batch key '%.*s' < last '%.*s'",
1138 key->len, key->byt, ba->key.len, ba->key.byt );
1141 while ( ! key->val.len /* end */
1142 || LBT_ENTMAX == ba->got /* full */
1143 || cmp /* next key */
1146 unsigned have = b->stk - LBT_DOFF - b->ent*sizeof(Dict) - ba->let;
1147 unsigned need = sizeof(Dict) + ba->key.len;
1148 unsigned want = need + ba->got * bt->vsz;
1149 unsigned take = ba->got;
1150 if ( have < want ) { /* you can't always get ... */
1154 take = (have - need) / bt->vsz;
1155 if ( 256 > take * bt->vsz )
1159 #if 0 /* prefix key turbo ??? -- doesn't seem to work all too well */
1160 if ( have < (bt->bsz >> 1) ) { /* ready to waste half a block ? */
1161 Dict *d = &b->dct[ b->ent-1 ];
1162 unsigned char *last = LBT_BASE(b)+LBT_POS(d);
1163 if ( ba->key.byt[0] != last[0]
1164 || ba->key.byt[1] != last[1]
1165 || ba->key.byt[2] != last[2]
1170 LOG_DBG( LOG_DEBUG, "batch '%.*s' got %u need %u want %u have %u take %u",
1171 ba->key.len, ba->key.byt, ba->got, need, want, have, take );
1172 /* we have enough room in b for take entries */
1174 unsigned char *base = LBT_BASE( b );
1175 unsigned char *s = base + b->stk;
1177 Dict *d = &b->dct[ b->ent++ ];
1179 memcpy( s -= bt->vsz, ba->val[i].byt, bt->vsz );
1180 memcpy( s -= ba->key.len, ba->key.byt, ba->key.len );
1181 b->stk = (unsigned short) (s - base);
1182 LBT_PV( d, b->stk, take );
1183 d->kln = ba->key.len;
1184 if ( ba->got > take )
1185 memcpy( ba->val, ba->val+take, sizeof(ba->val[0])*(ba->got - take) );
1190 if ( take ) /* key spans blocks */
1192 ba->free += b->stk - LBT_DOFF - b->ent*sizeof(Dict); /* wasted */
1193 b->nxt = bt->len; /* link */
1194 LOG_DBG( LOG_DEBUG, "batch: %u entries left, getting block %u",
1196 if ( WRULK( b, bt ) )
1197 return log_msg( LOG_SYSERR, "\twhen writing batch block" );
1198 /* so couldn't flush all, thus we need another block */
1199 b = ba->cur = NEWBLK( bt, 0, LBT_EX );
1200 /* will continue on end or keychange */
1201 if ( b->ent || b->stk < bt->bsz )
1202 return log_msg( ERR_TRASH, "new block %u is not empty: ent %u stk %u",
1203 b->num, b->ent, b->stk );
1204 LOG_DBG( LOG_VERBOSE, "new block %u has %u ent %u stk, will keep %u",
1205 b->num, b->ent, b->stk, ba->let );
1207 if ( cmp || ! key->val.len ) /* done with key: stats */
1208 if ( ba->maxv < ba->tot )
1210 if ( key->val.len ) {
1211 if ( cmp ) { /* set new key */
1213 return log_msg( ERR_IDIOT, "batch not empty on new key" );
1215 memcpy( ba->key.byt, key->byt, ba->key.len = key->len );
1217 ba->keyl += key->len;
1218 if ( ba->maxl < key->len )
1219 ba->maxl = key->len;
1222 /* LOG_HEX( key->val.byt, bt->vsz ); */
1223 memcpy( ba->val[ ba->got++ ].byt, key->val.byt, bt->vsz );
1227 WRULK( ba->cur, bt );
1232 } /* lbt_batchval */
1235 /* TODO: this function is an awful mess and needs complete rewrite, KR
1237 int lbt_add ( Idx *bt, Key *key )
1240 unsigned short e, i = 0;
1241 Block *b, *s = 0; /* mainblock, sibling */
1243 unsigned char *base, *sbase, *src, *v, *stk;
1245 int hadkey, split = 0, insibling = 0;
1251 if ( bt->key && key->len > bt->key )
1253 if ( findkey( &f, LBT_WR ) ) {
1257 LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1259 e = (LBT_ENTMASK & f.e[0]);
1263 if ( ! (hadkey = LBT_MATCH & f.e[0]) ) /* new key */
1264 need = key->len + sizeof(Dict) + bt->vsz;
1266 unsigned vln = LBT_VLN(d);
1267 v = base+LBT_POS(d)+d->kln;
1268 if ( 1 == vln ) { /* check stopword -- value entirely zeroed */
1270 unsigned char *C = v;
1271 while ( l-- && !*C++ ) /* huh huh, he said "C++" */
1274 LOG_DBG( LOG_VERBOSE, "key '%.*s' is a stopword", key->len, key->byt );
1278 i = valpos( v, vln, bt->vsz, key->val.byt );
1279 if ( LBT_MATCH & i ) {
1281 "adding key '%.*s'/%s: already there pos %u",
1282 key->len, key->byt, PRVAL( pv, key->val ), i & ~LBT_MATCH );
1283 goto done; /* value already there */
1288 "adding key '%.*s'/%s in block %d had %d ent %d stk %d need %d",
1289 key->len, key->byt, PRVAL( pv, key->val ),
1290 b->num, hadkey, b->ent, b->stk, need );
1292 if ( need > b->stk - LBT_DOFF - b->ent*sizeof(Dict) ) {
1293 /* now that's bad: not enough space
1294 create new block s, move last entries there
1295 starting from the end, entries (including the new one) are collected,
1296 until their combined size reaches more than half a block.
1297 If the combined size then is more than 2/3 a block,
1298 and the last entry alone takes more than 1/3 a block, it is split.
1299 TODO: check existing sibling for free space
1301 unsigned half = (bt->bsz - LBT_DOFF) >> 1;
1303 unsigned mv = b->ent;
1305 LOG_DBG( LOG_DEBUG, "splitting block %d need %d have %d (stk %d ent %d)",
1306 b->num, need, b->stk - LBT_DOFF - b->ent*sizeof(Dict), b->stk, b->ent );
1307 if ( need > (bt->bsz - LBT_DOFF) / 3 )
1308 LOG_OTO( done, (ERR_IDIOT, "OOPS! excessive need for %d bytes", need) );
1309 /* figure out split position */
1310 if ( !hadkey && b->ent == e ) {
1315 while ( d--, mv-- ) {
1316 unsigned sz = sizeof(Dict)+d->kln+LBT_VLN(d)*bt->vsz;
1317 if ( half > (tot += sz) ) { /* sz fits */
1318 if ( e == mv && !hadkey ) {
1320 check size of new entry.
1321 In case of hadkey, we don't bother for the bt->vsz bytes.
1323 if ( half < (tot + need) ) {
1324 if ( (half - tot) > need/2 ) { /* more balanced if we move */
1334 continue; /* else croak on ! mv below */
1336 /* exceeded half -- maybe we move entry mv anyway */
1337 if ( tot > (bt->bsz - LBT_DOFF)*2/3 ) {
1338 if ( tot-sz < (bt->bsz - LBT_DOFF)/3 )
1340 else { /* undo -- don't move this entry */
1345 LOG_OTO( done, (ERR_IDIOT, "OOPS! didn't find end tot %d bytes", tot) );
1348 if ( mv < e || (mv == e && hadkey && !split) )
1350 /* now mv is the first entry to move */
1351 s = NEWBLK( bt, 0, LBT_WR );
1353 "split: move %d of %d to new blk %d e %d tot %d insibling %d split %d",
1354 mv, b->ent, s->num, e, tot, insibling, split );
1357 sbase = LBT_BASE(s);
1359 /* now total includes full size of entry mv.
1360 if we included a new entry, it might even exceed bsz.
1361 calculate sizes for both blocks w/o mv's values.
1363 unsigned esz = sizeof(Dict)+d->kln; /* entry's base cost */
1364 unsigned vln = LBT_VLN(d);
1365 unsigned keep, diff, mvval, kpval, mvsiz, pos = LBT_POS(d);
1367 from currently used space, substract tot (including all
1368 values and possibly new key "need"), but add entry base cost.
1369 that should be the size remaining if we where moving
1370 all of the values but keep an entry and key.
1371 maybe slightly biased if need is bt->vsz (the hadkey case).
1373 keep = bt->bsz - b->stk + b->ent*sizeof(Dict) + need - tot + esz;
1374 tot -= vln * bt->vsz; /* undo entries */
1375 diff = (keep > tot ? keep - tot : tot - keep) / bt->vsz;
1377 LOG_OTO( done, (ERR_IDIOT,
1378 "OOPS! got diff %d vln %d entry %d tot %d keep %d",
1379 diff, vln, mv, tot, keep) );
1380 mvval = (vln - diff)/2;
1383 kpval = vln - mvval;
1384 if ( ! mvval || ! kpval ) {
1385 LOG_DBG( LOG_WARN, "bad mv/kpval %d/%d keep %d tot %d vln %d diff %d",
1386 mvval, kpval, keep, tot, vln, diff );
1388 kpval = vln - (mvval = 1);
1390 mvval = vln - (kpval = 1);
1392 tot += mvsiz = mvval*bt->vsz;
1393 keep += kpval*bt->vsz;
1396 "split: entry %d '%.*s' mv %d of %d values, i %d keep %d tot %d",
1397 mv, d->kln, src, mvval, vln, i, keep, tot );
1398 if ( keep > (bt->bsz-LBT_DOFF) || tot > (bt->bsz-LBT_DOFF) )
1399 LOG_OTO( done, (ERR_IDIOT,
1400 "OOPS! got diff %d vln %d mvval %d entry %d tot %d keep %d",
1401 diff, vln, mvval, mv, tot, keep) );
1403 memcpy( sbase + (s->stk -= mvsiz),
1404 src+d->kln+kpval*bt->vsz, mvsiz );
1405 if ( e == mv && hadkey && i > kpval ) {
1409 memmove( sbase+s->stk, sbase+s->stk+bt->vsz, i*bt->vsz );
1410 memcpy( sbase+s->stk+i*bt->vsz, key->val.byt, bt->vsz );
1413 memcpy( sbase + (s->stk -= d->kln), src, d->kln );
1414 LBT_PVK( s->dct, s->stk, mvval, d->kln );
1416 memmove( src + mvsiz, src, d->kln+kpval*bt->vsz );
1418 LBT_PV( d, pos, kpval );
1421 /* now entries mv..b->ent are moved, mv > 0 */
1422 b->stk = LBT_POS(b->dct+mv-1); /* pre-adjust stack */
1423 if ( mv < e ) { /* move entries mv..e-1 */
1424 /* used stack space is from pos of entry e-1 to end of entry mv
1425 (might differ from pos of mv-1 due to split)
1427 unsigned from = LBT_POS(b->dct+e-1), nent = e-mv,
1428 sz = LBT_POS(b->dct+mv)+b->dct[mv].kln
1429 +LBT_VLN(b->dct+mv)*bt->vsz - from;
1430 int adj = (s->stk -= sz) - from; /* entry position adjustment */
1431 memcpy( sbase + s->stk, base+from, sz );
1432 memcpy( s->dct+s->ent, b->dct+mv, nent*sizeof(Dict) );
1433 d = s->dct + s->ent;
1435 for ( ; nent--; d++ )
1436 LBT_POSADD( d, adj );
1438 if ( insibling && mv <= e ) { /* now mv or insert e, unless done on split */
1440 if ( hadkey ) { /* mv existing entry e >= mv, add new val */
1442 unsigned vln = LBT_VLN(d);
1443 src = base+LBT_POS(d)+d->kln;
1445 unsigned sz = (vln - i)*bt->vsz;
1446 memcpy( sbase + (s->stk -= sz), src + i*bt->vsz, sz );
1448 memcpy( sbase + (s->stk -= bt->vsz), key->val.byt, bt->vsz );
1450 unsigned sz = i*bt->vsz;
1451 memcpy( sbase + (s->stk -= sz), src, sz );
1453 memcpy( sbase + (s->stk -= d->kln), base+LBT_POS(d), d->kln );
1454 LBT_PVK( s->dct + s->ent, s->stk, vln+1, d->kln );
1455 e++; /* e is moved */
1457 memcpy( sbase + (s->stk -= bt->vsz), key->val.byt, bt->vsz );
1458 memcpy( sbase + (s->stk -= key->len), key->byt, key->len );
1459 LBT_PVK( s->dct + s->ent, s->stk, 1, key->len );
1462 } /* mv or insert e */
1463 if ( e < b->ent && mv < b->ent ) { /* move entries max(e,mv) to end */
1464 unsigned fst = mv < e ? e : mv;
1465 /* used stack space is from pos of entry b->ent-1 (the original b->stk)
1466 to end of entry fst (might differ from pos of fst-1 due to split)
1468 unsigned from = LBT_POS(b->dct+b->ent-1), nent = b->ent-fst,
1469 sz = LBT_POS(b->dct+fst)+b->dct[fst].kln
1470 +LBT_VLN(b->dct+fst)*bt->vsz - from;
1471 int adj = (s->stk -= sz) - from; /* entry position adjustment */
1472 LOG_DBG( LOG_DEBUG, "split: entries %d..%d sz %d from %d to %d",
1473 fst, b->ent-1, sz, from, s->stk );
1474 memcpy( sbase + s->stk, base+from, sz );
1475 memcpy( s->dct+s->ent, b->dct+fst, nent*sizeof(Dict) );
1476 d = s->dct + s->ent;
1478 for ( ; nent--; d++ ) {
1479 #if !defined(NDEBUG)
1480 unsigned p = LBT_POS(d);
1482 LBT_POSADD( d, adj );
1483 #if !defined(NDEBUG)
1484 LOG_DBG( LOG_DEBUG, "new ent %d p/v/k %d/%d/%d o %d",
1485 d - s->dct, LBT_POS(d), LBT_VLN(d), d->kln, p );
1489 b->ent = mv; /* kill entries */
1491 "split: blks %d/%d with %d/%d entries, s starts '%.*s'",
1492 b->num, s->num, b->ent, s->ent, s->dct[0].kln, sbase+LBT_POS(s->dct) );
1495 if ( ! insibling ) {
1496 /* now we do have enough space */
1498 stk = base + b->stk; /* bottom of data stack */
1499 if ( hadkey ) { /* insert value at i */
1500 unsigned char *val = base + LBT_POS(d)+d->kln+i*bt->vsz;
1504 "in entry %d '%.*s' val %d: memmove( %p %p %d) val %p need %d",
1505 e, d->kln, base+LBT_POS(d), i,
1506 stk-need, stk, (val-stk)+need, val, need );
1508 memmove( stk - need, stk, (val - stk)+need );
1509 memcpy( val - need, key->val.byt, bt->vsz );
1511 } else { /* insert key, value and dict entry */
1512 /* take dict and data space from entry e */
1514 need = key->len + bt->vsz;
1516 nu = base + b->stk - need;
1518 /* at end of next entry, i.e. end of block or beg of previous */
1519 nu = base + LBT_POS(d) + d->kln + LBT_VLN(d)*bt->vsz;
1520 memmove( stk - need, stk, nu - stk );
1522 memmove( d+1, d, (b->ent - e)*sizeof(Dict) );
1524 memcpy( nu, key->byt, key->len );
1525 memcpy( nu + key->len, key->val.byt, bt->vsz );
1526 LBT_PV( d, nu-base, 1 );
1533 need is set to the needed (i.e. additionally used) data space,
1534 e is the first dict entry to redirect need bytes lower
1537 for ( ; e < b->ent; e++, d++ )
1538 LBT_POSADD( d, -need );
1540 while ( s ) { /* created sibling */
1541 unsigned num = s->num;
1542 unsigned char *e0 = LBT_BASE(s) + LBT_POS(s->dct);
1543 unsigned l = s->lev + 1;
1547 memcpy( k.byt, e0, k.len = s->dct[0].kln );
1549 if ( s->lev ? s->dct[0].vln : split )
1550 memcpy( k.val.byt, e0+k.len, k.val.len = bt->vsz );
1551 LOG_DBG( LOG_DEBUG, "split: had %d/%d on lev %d key '%.*s' vln %d %s",
1552 b->num, num, s->lev, k.len, k.byt, k.val.len, PRVAL( pv, k.val ) );
1555 if ( f.b[l]->num == f.n[l] && ! LBT_BUSY(f.b[l]) )
1556 f.b[l]->lck = LBT_WR;
1558 f.b[l] = getblk( bt, f.n[l], l, LBT_WR );
1560 "split: got parent %d lev %d", f.b[l]->num, f.b[l]->lev );
1561 /* MT TODO: again L/Y link hunt */
1562 /* ok, locked the parent -- release s and b */
1563 if ( (ret = WRULK( s, bt )) || (ret = WRULK( b, bt )) )
1568 e = KEYPOS( bt, b, &k );
1569 LOG_DBG( LOG_DEBUG, "split: got parent pos %d", e );
1570 if ( LBT_MATCH & e ) /* !??? */
1571 LOG_OTO( done, (ERR_TRASH, "OOPS! found key in parent" ) );
1573 TRACEKEY( ERR_TRASH, &f );
1574 LOG_OTO( done, (ERR_TRASH, "OOPS! parent insert at 0 lev %d", l ) );
1576 need = sizeof(Dict) + k.len + 4 + k.val.len;
1577 if ( need > b->stk - LBT_DOFF - b->ent*sizeof(Dict) ) {
1578 unsigned half = ((bt->bsz - b->stk)+b->ent*sizeof(Dict)+need) / 2;
1579 unsigned mv = b->ent, tot = 0, nent, sz;
1581 if ( e == b->ent ) {
1585 for ( d = b->dct+mv; d--, mv--; ) {
1586 sz = sizeof(Dict) + d->kln + 4 + d->vln;
1587 if ( half > (tot += sz) ) {
1589 if ( half > tot+need || tot+need-half < half-tot ) {
1591 ins = 0; /* new link goes to sibling */
1600 if ( tot - half > sz/2 ) { /* don't move this */
1603 } else if ( ! mv ) /* !??? */
1607 /* split here a little bit simpler */
1608 s = NEWBLK( bt, l, LBT_WR );
1611 /* straight move entries mv..b->ent-1 to sibling */
1612 sbase = LBT_BASE(s);
1614 sz = b->dct[mv-1].pos - b->stk;
1617 "split: parent lev %d split %d to new %d mv %d"
1618 " '%.*s'/%s nent %d sz %d",
1619 l, b->num, s->num, mv, d->kln, base+d->pos,
1620 prval( pv, base+d->pos+d->kln, d->vln ), nent, sz );
1621 memcpy( sbase + (s->stk -= sz), base + b->stk, sz );
1622 adj = s->stk - b->stk;
1624 memcpy( s->dct, d, nent*sizeof(Dict) );
1627 for ( d = s->dct; nent--; d++ )
1633 if ( (ret = chkblk( s, bt )) )
1636 /* now insert link in block ins at pos e */ {
1637 unsigned isz = k.len + 4 + k.val.len, nent = ins->ent - e;
1638 unsigned char *ibase = LBT_BASE(ins);
1639 unsigned end = e ? ins->dct[e-1].pos : bt->bsz;
1641 "split: ins parent %d lev %d pos %d key '%.*s' -> %d vln %d %s",
1642 ins->num, ins->lev, e, k.len, k.byt, num, k.val.len,
1643 PRVAL( pv, k.val ) );
1644 memmove( ibase + ins->stk - isz, ibase + ins->stk, end - ins->stk );
1646 memcpy( ibase + (end -= 4), &num, 4 );
1648 memcpy( ibase + (end -= k.val.len), k.val.byt, k.val.len );
1649 memcpy( ibase + (end -= k.len), k.byt, k.len );
1650 for ( d = ins->dct+ins->ent; nent--; d-- ) {
1658 if ( (ret = chkblk( ins, bt )) )
1661 if ( ! s ) /* write b below or in next turn */
1663 if ( ! b->num ) { /* root split */
1664 Block *bb = NEWBLK( bt, l, LBT_WR );
1665 LOG_DBG( LOG_DEBUG, "root split!" );
1666 /* cp all but the num */
1667 memcpy( LBT_BASE(bb)+4, LBT_BASE(b)+4, bt->bsz-4 );
1668 b->nxt = s->nxt; /* restore root's special nxt */
1670 b->stk = bt->bsz; /* empty the root */
1671 /* add two childs bb and s */
1672 SETINT( base + (b->stk -= 4), bb->num );
1673 LBT_PVK( b->dct, b->stk, 0, 0 );
1674 SETINT( base + (b->stk -= 4), s->num );
1675 e0 = LBT_BASE(s) + LBT_POS(s->dct);
1676 if ( s->dct[0].vln )
1677 memcpy( base + (b->stk -= bt->vsz), e0+s->dct[0].kln, bt->vsz );
1678 memcpy( base + (b->stk -= s->dct[0].kln), e0, s->dct[0].kln );
1679 LBT_PVK( b->dct+1, b->stk, s->dct[0].vln, s->dct[0].kln );
1681 b->lev++; /* increase tree depth */
1682 if ( (ret = WRULK( s, bt )) || (ret = WRULK( bb, bt )) )
1689 ret = WRULK( b, bt );
1695 if ( !ret && b->lev && findkey( &f, LBT_RD ) )
1696 ret = log_msg( ERR_TRASH, "could not find key after add" );
1701 int lbt_del ( Idx *bt, Key *key )
1704 unsigned short e, n, i, mv;
1707 unsigned char *base, *v;
1713 LOG_DBG( LOG_DEBUG, "deleting key '%.*s'/%s",
1714 key->len, key->byt, PRVAL( pv, key->val ) );
1715 if ( findkey( &f, LBT_WR ) )
1716 LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1717 if ( !(LBT_MATCH & f.e[0]) ) /* key not found - ok */
1719 LOG_DBG( LOG_DEBUG, "key found" );
1720 e = (LBT_ENTMASK & f.e[0]);
1724 v = base+LBT_POS(d)+d->kln; /* start of values */
1728 i = valpos( v, n, bt->vsz, key->val.byt );
1729 LOG_DBG( LOG_DEBUG, "val %sfound pos %u of %u",
1730 (LBT_MATCH & i) ? "" : "not ", i & ~LBT_MATCH, n );
1731 if ( !(LBT_MATCH & i) ) /* val not found - ok */
1735 "del blk %d e %d i/n %d/%d", b->num, e, i, n );
1737 v += i*bt->vsz; /* end of stack area to move: start of this value */
1738 mv = bt->vsz; /* amount to move: one value size */
1739 LBT_VLNADD( d, -1 );
1740 } else { /* sole value -- rm whole entry */
1741 v = base+LBT_POS(d); /* start of key */
1742 mv = d->kln+bt->vsz;
1743 for ( i=b->ent - e - 1; i--; d++ ) /* move the i entries after d down */
1745 d = b->dct + e; /* reset */
1748 memmove( base+b->stk+mv, base+b->stk, v-(base+b->stk) );
1750 for ( ; e++ < b->ent; d++ )
1751 LBT_POSADD( d, mv );
1761 int lbt_loop ( Idx *bt, DXLoop *l )
1765 int mode = IDXMODE & l->flg;
1768 if ( ! cmp->len ) /* start at 1st leaf */
1769 b = getblk( bt, 1, 0, LBT_RD );
1770 else { /* locate key */
1774 if ( findkey( &f, LBT_RD ) )
1775 LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1777 i = (unsigned)(LBT_ENTMASK & f.e[0]);
1778 if ( !(LBT_MATCH & f.e[0]) && IDXEQ == mode )
1781 if ( IDXUPTO <= mode )
1783 for ( ;; i=0 ) { /* blocks */
1784 unsigned char *base = LBT_BASE( b );
1786 if ( ! b || b->lev )
1788 b->lck = LBT_EX; /* lock */
1790 for ( ; i<b->ent; i++, d++ ) {
1791 unsigned short pos = LBT_POS( d );
1792 unsigned short vln = LBT_VLN( d );
1793 unsigned short kln = d->kln;
1795 unsigned char *v = base+pos+kln;
1798 int diff = memcmp( cmp->byt, base+pos,
1799 kln <= cmp->len ? kln : cmp->len );
1802 if ( diff || kln < cmp->len )
1806 if ( diff || kln != cmp->len )
1814 if ( 0 > diff || (!diff && kln > cmp->len) )
1817 default: /* unused */
1820 "loop ends on key '%.*s'(%d) %d %d",
1821 kln, base+pos, kln, diff, cmp->len );
1824 memcpy( key.byt, base+pos, key.len = kln );
1826 for ( j=0; j<vln; j++, v += bt->vsz ) {
1827 rdval( &hit, v, bt->vsz );
1828 ret = l->cb( l->me, &key, &hit );
1832 else { /* internal check */
1836 memcpy( key.val.byt, v, key.val.len = bt->vsz );
1837 if ( findkey( &f, LBT_RD ) )
1838 LOG_OTO( done, ( ERR_TRASH, "could not find key" ) );
1839 if ( !(LBT_MATCH & f.e[0])
1841 || i != (unsigned)(LBT_ENTMASK & f.e[0])
1844 "OOPS! want %u(%p).%u got %u(%p).%u (match %x)",
1845 b->num, b, i, f.b[0]->num, f.b[0], LBT_ENTMASK & f.e[0], f.e[0] );
1846 rdval( &hit, v, bt->vsz );
1848 "\twhen looking for '%.*s'/%u.%u.%u.%u",
1849 key.len, key.byt, hit.mfn, hit.tag, hit.occ, hit.pos );
1850 if ( f.b[0]->ent <= (LBT_ENTMASK & f.e[0]) )
1851 log_msg( ERR_TRASH, "\tI found nothing" );
1853 Dict *dd = f.b[0]->dct + (LBT_ENTMASK & f.e[0]);
1854 unsigned char *bb = LBT_BASE(f.b[0])+LBT_POS(dd);
1855 rdval( &hit, bb+dd->kln, bt->vsz );
1857 "\tI found '%.*s'/%u.%u.%u.%u",
1858 dd->kln, bb, hit.mfn, hit.tag, hit.occ, hit.pos );
1861 log_msg( LOG_TRACE, "key ok" );
1865 if ( ! b->nxt || !(b = getblk(bt, b->nxt, 0, LBT_RD)) )
1872 l->cb( l->me, 0, 0 );
1875 "checked %i keys in %u blks of %dK depth %u cmp/get/miss %u/%u/%u",
1876 stat.key, bt->len, bt->bsz>>10, bt->root->lev, stat.cmp, stat.get, stat.miss );
1881 /* mimik the plain old ldb_search */
1882 int lbt_search ( Idx *bt, Key *key, LdbPost *post, Rec *rec )
1886 int cont = rec && rec->len; /* continuing previous terms search */
1887 char *stk = rec ? ((char*)rec + rec->bytes) : 0; /* term stack */
1891 LdbP * const p = post->p; /* really just an alias */
1893 /* check for prefix match */
1896 return log_msg( ERR_INVAL, "bad legacy search on vsz %d", bt->vsz );
1897 prefix = LDB_PFX & post->mode;
1900 return log_msg( ERR_INVAL, "lbt_search needs rec or post" );
1901 if ( (prefix = key->len && '$' == key->byt[key->len - 1]) )
1908 memset( &start, 0, sizeof(start) );
1909 start.len = 255 < rec->field[rec->len-1].len
1910 ? 255 : rec->field[rec->len-1].len;
1911 memcpy( start.byt, rec->field[rec->len-1].val, start.len );
1915 if ( ! start.len ) /* start at 1st leaf */
1916 b = getblk( bt, 1, 0, LBT_RD );
1917 else { /* locate key */
1921 if ( findkey( &f, LBT_RD ) )
1922 LOG_OTO( ouch, ( ERR_TRASH, "could not find key" ) );
1924 i = (unsigned)(LBT_ENTMASK & f.e[0]);
1925 if ( !(LBT_MATCH & f.e[0]) && ! prefix )
1929 for ( ;; i=0 ) { /* blocks */
1930 unsigned char *base = LBT_BASE( b );
1932 if ( ! b || b->lev )
1934 b->lck = LBT_EX; /* lock */
1936 for ( ; i<b->ent; i++, d++ ) { /* entries */
1937 unsigned char *ent = base + LBT_POS( d ); /* start of key */
1938 unsigned short kln = d->kln;
1939 if ( memcmp( key->byt, ent, kln <= key->len ? kln : key->len )
1940 || (prefix ? kln < key->len : kln != key->len )
1943 if ( rec ) { /* record term */
1944 Field *f = rec->field + rec->len; /* field to assign */
1945 /** append term to record, unless it's the same as previous */
1947 if ( kln == f[-1].len && !memcmp( f[-1].val, ent, kln ) )
1949 } else if ( cont ) {
1950 if ( kln == start.len && !memcmp( start.byt, ent, kln ) )
1954 if ( stk < (char*)(f+1) ) /* stack reached field dict */
1956 memcpy( stk, ent, kln );
1965 unsigned short vln, k, m=0;
1967 int f = post->fil-1; /* highest pos to consider in given postings */
1968 if ( !(vln = LBT_VLN( d )) )
1970 ent += kln; /* set to start of values */
1971 c = ent + 8*(vln-1); /* last value */
1972 /* collect postings */
1973 for ( k=vln; k--; c-=8 ) {
1974 /* loop backwards (for the fun of it) postings in segment */
1975 int prow, ptag, ppos;
1976 LdbP e; /* the entry */
1977 LdbP samerow; /* highest possible entry w/ same row as e */
1978 #if defined( LDB_BIG_ENDIAN )
1979 /* the 8 bytes of a posting are BIG ENDIAN ! */
1980 memcpy(e.bytes,c,8);
1982 e.bytes[0] = c[7]; e.bytes[1] = c[6];
1983 e.bytes[2] = c[5]; e.bytes[3] = c[4];
1984 e.bytes[4] = c[3]; e.bytes[5] = c[2];
1985 e.bytes[6] = c[1]; e.bytes[7] = c[0];
1987 prow = LDBP_ROW( &e );
1988 ptag = LDBP_TAG( &e );
1989 ppos = LDBP_POS( &e );
1990 LOG_DBG( LOG_VERBOSE, "post %d.%hd pos %06x", prow, ptag, ppos );
1993 if ( (post->cut && prow >= post->cut)
1994 || (post->tag && post->tag != ptag)
1997 if ( prow < post->skp ) /* quickly bail out on skip condition */
1999 LDBP_SETROWTOP( &samerow, &e ); /* for mfn comparison */
2000 /* sweep down to postings for the same row as e ... */
2001 while ( f >= 0 && LDBP_GT( p+f, &samerow ) )
2003 if ( LDB_AND & post->mode ) {
2005 /* loop postings for same row, mark all (that are near enough) */
2006 LDBP_SETROWBOT( &samerow, &e ); /* for mfn comparison */
2007 /* NOTE: postings for row are GT than bottom even if marked */
2008 for ( l = f; l>=0 && LDBP_GT( p+l, &samerow ); l-- ) {
2011 if ( ptag != LDBP_TAG( p+l ) ) continue;
2012 if ( LDB_NEAR_G != post->near ) {
2013 dist = LDBP_POS( p+l ) - LDBP_POS( &e );
2014 if ( dist < 0 ) dist = -dist;
2017 : -post->near != dist /* exact $$$$ */
2021 LDBP_SETMARK( p+l );
2023 } else { /* OR mode */
2025 if ( ! post->near ) /* add if row not found: ignore details */
2026 add = 0 > f || prow > LDBP_ROW( p+f );
2027 else { /* add if no exact match */
2029 /* NOTE: we don't use mark bit in OR mode, do we ? */
2030 for ( l = f; l>=0 && LDBP_GT( p+l, &e ); l-- )
2032 add = 0 > l || LDBP_GT( &e, p+l );
2037 } /* for postings in segment */
2038 if ( m ) { /* merge in the merge buffer */
2040 for ( k = post->fil += m; m && k--; ) {
2042 if ( k < m || LDBP_GT( mm, &p[k-m] ) ) {
2045 LOG_DBG( LOG_DEBUG, "merging %d at %d", LDBP_ROW(&src), k );
2048 if ( k < post->len )
2050 else { /* set cut */
2051 int row = LDBP_ROW( &src );
2052 if ( row < post->cut || !post->cut )
2056 if ( post->fil > post->len )
2057 post->fil = post->len;
2058 if ( post->cut ) /* postings for cut row are unreliable */
2059 while ( post->fil && post->cut <= LDBP_ROW(p+post->fil-1) )
2063 } /* for entries in block */
2065 if ( ! b->nxt || !(b = getblk(bt, b->nxt, 0, LBT_RD)) )
2078 int lbt_init ( Idx *bt )
2080 int ret = 0, blkbits, i;
2082 int size = lio_size( bt->fd );
2087 for ( i=LBT_LRULEV; i--; )
2088 bt->lru[i] = bt->mru[i] = 0;
2090 if ( 0x3ff & size ) /* not on 1K boundary */
2091 return log_msg( LOG_ERROR, "bt has bad size %dK + %d",
2092 size >> 10, (int)size & 0x3ff );
2093 if ( size ) { /* read root header */
2095 unsigned want = sizeof(b) - LBT_BOFF;
2096 int got = lio_pread( &bt->fd, LBT_BASE(&b), want, 0 );
2097 if ( want != (unsigned)got )
2098 return log_msg( LOG_SYSERR, "could not get root head" );
2100 return log_msg( ERR_TRASH, "root has num %d", b.num );
2101 /*if ( LBT_ISIS != b.nxt ) log_msg( LOG_WARN, "magic is 0x%08x", b.nxt );*/
2102 if ( bt->typ && bt->typ != b.typ )
2103 log_msg( LOG_WARN, "typ want 0x02%x got 0x02%x", bt->typ, b.typ );
2105 if ( bt->key && bt->key != b.key )
2106 log_msg( LOG_WARN, "key want %d got %d", bt->key, b.key );
2108 if ( bt->col && bt->col != b.col )
2109 log_msg( LOG_WARN, "col want %d got %d", bt->col, b.col );
2116 } else { /* new index */
2122 bt->vsz = 8+(0xf & bt->typ);
2123 blkbits = 10 + (3 & (bt->typ >> 4));
2124 bt->bsz = 1 << blkbits;
2125 if ( size % bt->bsz )
2126 return log_msg( ERR_TRASH, "size 0x%08x % blksize", size, bt->bsz );
2127 bt->len = size >> blkbits;
2128 if ( 2 > bt->len && !(LBT_WRITE & bt->flg) )
2129 return log_msg( LOG_SYSERR, "attempt to read empty index" );
2131 bt->hlen = 16 + bt->len / 32; /* initial cache size */
2132 if ( bt->hlen < 1000 ) bt->hlen = 1000;
2133 if ( !(bt->root = addchunk( bt, bt->hlen )) )
2134 return log_msg( LOG_SYSERR, "\twhen getting bt cache" );
2135 if ( bt->hlen < 256 ) /* the hash */
2137 if ( !(bt->hash = (Block**)mAlloc( bt->hlen * sizeof(Block*) )) )
2138 return log_msg( LOG_SYSERR, "\twhen getting bt hash" );
2139 memset( bt->hash, 0, bt->hlen * sizeof(Block*) );
2141 if ( size && readblk( bt->root, bt, 0 ) )
2142 return log_msg( ERR_TRASH, "could not read root" );
2144 /* init: link root to leaf */
2145 SETINT( LBT_BASE(bt->root)+(bt->root->stk -= 4), 1 );
2146 LBT_PVK( bt->root->dct, bt->root->stk, 0, 0 );
2150 if ( !(l = getblk( bt, 1, 0, LBT_EX )) )
2151 return log_msg( ERR_TRASH, "could not read 1st leaf" );
2153 log_msg( LOG_INFO, "creating index bsz %u ksz %u vsz %u",
2154 bt->bsz, bt->key, bt->vsz );
2156 if ( WRULK( bt->root, bt ) || WRULK( l, bt ) )
2157 return log_msg( LOG_SYSERR, "\twhen writing base" );
2164 void lbt_close ( Idx *bt )
2166 Chunk *c, *n = bt->mem;
2172 lio_close( &bt->fd, LIO_INOUT );
2173 memset( bt, 0, sizeof(*bt) );
2177 /* ************************************************************
2182 void cXMkVal ( Idx *bt, Val *val, Hit *hit )
2191 int cXAdd ( Idx *bt, Key *key, Hit *hit )
2195 key = &bt->bat->key;
2196 cXMkVal( bt, &key->val, hit );
2197 return lbt_batchval( bt, key );
2201 cXMkVal( bt, &key->val, hit );
2202 if ( hit->dbn == 0xffff )
2203 return lbt_del( bt, key );
2205 return lbt_add( bt, key );