Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Change locks to use 32-bit shm-sequence-ids instead of 64-bit tree-ids.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: acf38e32c81b8d3fc643c53fe2fa5d849fb8f504
User & Date: dan 2012-09-10 20:07:54.155
Context
2012-09-11
11:47
Fix a memory leak in lsm_unix.c. check-in: bf4758ab15 user: dan tags: trunk
2012-09-10
20:07
Change locks to use 32-bit shm-sequence-ids instead of 64-bit tree-ids. check-in: acf38e32c8 user: dan tags: trunk
18:39
Change the way version races between the in-memory tree and lsm are handled in lsm.wiki. check-in: e12aa10a7c user: dan tags: trunk
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/lsmInt.h.
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#define array_size(x) (sizeof(x)/sizeof(x[0]))


/* The size of each shared-memory chunk */
#define LSM_SHM_CHUNK_SIZE (32*1024)

/* The number of bytes reserved at the start of each shm chunk for MM. */
#define LSM_SHM_CHUNK_HDR  (3 * 4)

/* The number of available read locks. */
#define LSM_LOCK_NREADER   6

/* Lock definitions */
#define LSM_LOCK_DMS1         1
#define LSM_LOCK_DMS2         2







|







126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#define array_size(x) (sizeof(x)/sizeof(x[0]))


/* The size of each shared-memory chunk */
#define LSM_SHM_CHUNK_SIZE (32*1024)

/* The number of bytes reserved at the start of each shm chunk for MM. */
#define LSM_SHM_CHUNK_HDR  (sizeof(ShmChunk))

/* The number of available read locks. */
#define LSM_LOCK_NREADER   6

/* Lock definitions */
#define LSM_LOCK_DMS1         1
#define LSM_LOCK_DMS2         2
229
230
231
232
233
234
235


236
237

238
239
240
241
242
243
244
245
246
247
248
249
  LogRegion aRegion[3];           /* Log file regions (see docs in lsm_log.c) */
};

/*
** Tree header structure. 
*/
struct TreeHeader {


  u32 iTreeId;                    /* Current tree id */
  u32 iTransId;                   /* Current transaction id */

  u32 iRoot;                      /* Offset of root node in shm file */
  u32 nHeight;                    /* Current height of tree structure */
  u32 iWrite;                     /* Write offset in shm file */
  u32 nChunk;                     /* Number of chunks in shared-memory file */
  u32 iFirst;                     /* First chunk in linked list */
  u32 nByte;                      /* Size of current tree structure in bytes */
  DbLog log;                      /* Current layout of log file */ 
  u32 aCksum[2];                  /* Checksums 1 and 2. */
};

/*
** Database handle structure.







>
>
|

>



<
<







229
230
231
232
233
234
235
236
237
238
239
240
241
242
243


244
245
246
247
248
249
250
  LogRegion aRegion[3];           /* Log file regions (see docs in lsm_log.c) */
};

/*
** Tree header structure. 
*/
struct TreeHeader {
  u32 iUsedShmid;                 /* Id of first shm chunk used by this tree */
  u32 iNextShmid;                 /* Shm-id of next chunk allocated */
  u32 iFirst;                     /* Chunk number of smallest shm-id */
  u32 iTransId;                   /* Current transaction id */
  u32 nChunk;                     /* Number of chunks in shared-memory file */
  u32 iRoot;                      /* Offset of root node in shm file */
  u32 nHeight;                    /* Current height of tree structure */
  u32 iWrite;                     /* Write offset in shm file */


  u32 nByte;                      /* Size of current tree structure in bytes */
  DbLog log;                      /* Current layout of log file */ 
  u32 aCksum[2];                  /* Checksums 1 and 2. */
};

/*
** Database handle structure.
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
*/
#define segmentHasSeparators(pSegment) ((pSegment)->sep.iFirst>0)

/*
** The values that accompany the lock held by a database reader.
*/
struct ShmReader {
  i64 iTreeId;
  i64 iLsmId;
};

/*
** An instance of this structure is stored in the first shared-memory
** page. The shared-memory header.
**







|







377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
*/
#define segmentHasSeparators(pSegment) ((pSegment)->sep.iFirst>0)

/*
** The values that accompany the lock held by a database reader.
*/
struct ShmReader {
  u32 iTreeId;
  i64 iLsmId;
};

/*
** An instance of this structure is stored in the first shared-memory
** page. The shared-memory header.
**
415
416
417
418
419
420
421
422
423
424
425



426
427
428
429
430
431
432
};

/*
** An instance of this structure is stored at the start of each shared-memory
** chunk except the first (which is the header chunk - see above).
*/
struct ShmChunk {
  u32 iFirstTree;
  u32 iLastTree;
  u32 iNext;
};




#define LSM_APPLIST_SZ 4

typedef struct Freelist Freelist;
typedef struct FreelistEntry FreelistEntry;

/*







<
|


>
>
>







416
417
418
419
420
421
422

423
424
425
426
427
428
429
430
431
432
433
434
435
};

/*
** An instance of this structure is stored at the start of each shared-memory
** chunk except the first (which is the header chunk - see above).
*/
struct ShmChunk {

  u32 iShmid;
  u32 iNext;
};

/* Return true if shm-sequence "a" is larger than or equal to "b" */
#define shm_sequence_ge(a, b) (((u32)a-(u32)b) < (1<<30))

#define LSM_APPLIST_SZ 4

typedef struct Freelist Freelist;
typedef struct FreelistEntry FreelistEntry;

/*
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509

/* 
** Functions from file "lsm_tree.c".
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
void lsmTreeRelease(lsm_env *, Tree *);
void lsmTreeClear(lsm_db *);
void lsmTreeInit(lsm_db *);

int lsmTreeSize(lsm_db *);
int lsmTreeEndTransaction(lsm_db *pDb, int bCommit);
int lsmTreeBeginTransaction(lsm_db *pDb);
int lsmTreeLoadHeader(lsm_db *pDb);

int lsmTreeInsert(lsm_db *pDb, void *pKey, int nKey, void *pVal, int nVal);







|







498
499
500
501
502
503
504
505
506
507
508
509
510
511
512

/* 
** Functions from file "lsm_tree.c".
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
void lsmTreeRelease(lsm_env *, Tree *);
void lsmTreeClear(lsm_db *);
int lsmTreeInit(lsm_db *);

int lsmTreeSize(lsm_db *);
int lsmTreeEndTransaction(lsm_db *pDb, int bCommit);
int lsmTreeBeginTransaction(lsm_db *pDb);
int lsmTreeLoadHeader(lsm_db *pDb);

int lsmTreeInsert(lsm_db *pDb, void *pKey, int nKey, void *pVal, int nVal);
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796

#ifdef LSM_DEBUG
void lsmShmHasLock(lsm_db *db, int iLock, int eOp);
#else
# define lsmShmHasLock(x,y,z)
#endif

int lsmReadlock(lsm_db *, i64 iLsm, i64 iTree);
int lsmReleaseReadlock(lsm_db *);

int lsmLsmInUse(lsm_db *db, i64 iLsmId, int *pbInUse);
int lsmTreeInUse(lsm_db *db, u32 iLsmId, int *pbInUse);
int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId);

int lsmDbMultiProc(lsm_db *);







|







785
786
787
788
789
790
791
792
793
794
795
796
797
798
799

#ifdef LSM_DEBUG
void lsmShmHasLock(lsm_db *db, int iLock, int eOp);
#else
# define lsmShmHasLock(x,y,z)
#endif

int lsmReadlock(lsm_db *, i64 iLsm, u32 iTree);
int lsmReleaseReadlock(lsm_db *);

int lsmLsmInUse(lsm_db *db, i64 iLsmId, int *pbInUse);
int lsmTreeInUse(lsm_db *db, u32 iLsmId, int *pbInUse);
int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId);

int lsmDbMultiProc(lsm_db *);
Changes to src/lsm_log.c.
894
895
896
897
898
899
900
901


902
903
904
905
906
907
908
  int iPass;
  int nJump = 0;                  /* Number of LSM_LOG_JUMP records in pass 0 */
  DbLog *pLog;

  rc = lsmFsOpenLog(pDb->pFS);
  if( rc!=LSM_OK ) return rc;

  lsmTreeInit(pDb);


  pLog = &pDb->treehdr.log;
  lsmCheckpointLogoffset(pDb->pShmhdr->aWorker, pLog);

  logReaderInit(pDb, pLog, 1, &reader);
  lsmStringInit(&buf1, pDb->pEnv);
  lsmStringInit(&buf2, pDb->pEnv);








|
>
>







894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
  int iPass;
  int nJump = 0;                  /* Number of LSM_LOG_JUMP records in pass 0 */
  DbLog *pLog;

  rc = lsmFsOpenLog(pDb->pFS);
  if( rc!=LSM_OK ) return rc;

  rc = lsmTreeInit(pDb);
  if( rc!=LSM_OK ) return rc;

  pLog = &pDb->treehdr.log;
  lsmCheckpointLogoffset(pDb->pShmhdr->aWorker, pLog);

  logReaderInit(pDb, pLog, 1, &reader);
  lsmStringInit(&buf1, pDb->pEnv);
  lsmStringInit(&buf2, pDb->pEnv);

Changes to src/lsm_shared.c.
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
  }
}

LsmFile *lsmDbRecycleFd(lsm_db *db){
  LsmFile *pRet;
  Database *p = db->pDatabase;
  lsmMutexEnter(db->pEnv, p->pClientMutex);
  if( pRet = p->pLsmFile ){
    p->pLsmFile = pRet->pNext;
  }
  lsmMutexLeave(db->pEnv, p->pClientMutex);
  return pRet;
}

/*







|







338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
  }
}

LsmFile *lsmDbRecycleFd(lsm_db *db){
  LsmFile *pRet;
  Database *p = db->pDatabase;
  lsmMutexEnter(db->pEnv, p->pClientMutex);
  if( (pRet = p->pLsmFile)!=0 ){
    p->pLsmFile = pRet->pNext;
  }
  lsmMutexLeave(db->pEnv, p->pClientMutex);
  return pRet;
}

/*
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677

    /* Take a read-lock on the tree and snapshot just loaded. Then check
    ** that the shared-memory still contains the same values. If so, proceed.
    ** Otherwise, relinquish the read-lock and retry the whole procedure
    ** (starting with loading the in-memory tree header).  */
    if( rc==LSM_OK ){
      ShmHeader *pShm = pDb->pShmhdr;
      i64 iTree = pDb->treehdr.iTreeId;
      i64 iSnap = lsmCheckpointId(pDb->aSnapshot, 0);
      rc = lsmReadlock(pDb, iSnap, iTree);
      if( rc==LSM_OK ){
        if( (i64)pShm->hdr1.iTreeId==iTree 
         && pShm->hdr1.iTransId==pDb->treehdr.iTransId
         && lsmCheckpointId(pShm->aClient, 0)==iSnap
        ){
          /* Read lock has been successfully obtained. Deserialize the 
          ** checkpoint just loaded. TODO: This will be removed after 
          ** lsm_sorted.c is changed to work directly from the serialized
          ** version of the snapshot.  */
          rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
          assert( (rc==LSM_OK)==(pDb->pClient!=0) );







|

|

<
|
|







657
658
659
660
661
662
663
664
665
666
667

668
669
670
671
672
673
674
675
676

    /* Take a read-lock on the tree and snapshot just loaded. Then check
    ** that the shared-memory still contains the same values. If so, proceed.
    ** Otherwise, relinquish the read-lock and retry the whole procedure
    ** (starting with loading the in-memory tree header).  */
    if( rc==LSM_OK ){
      ShmHeader *pShm = pDb->pShmhdr;
      u32 iShmchunk = pDb->treehdr.iUsedShmid;
      i64 iSnap = lsmCheckpointId(pDb->aSnapshot, 0);
      rc = lsmReadlock(pDb, iSnap, iShmchunk);
      if( rc==LSM_OK ){

        if( 0==memcmp(pShm->hdr1.aCksum, pDb->treehdr.aCksum, sizeof(u32)*2)
         && iSnap==lsmCheckpointId(pShm->aClient, 0)
        ){
          /* Read lock has been successfully obtained. Deserialize the 
          ** checkpoint just loaded. TODO: This will be removed after 
          ** lsm_sorted.c is changed to work directly from the serialized
          ** version of the snapshot.  */
          rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
          assert( (rc==LSM_OK)==(pDb->pClient!=0) );
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
#endif

/*
** Obtain a read-lock on database version identified by the combination
** of snapshot iLsm and tree iTree. Return LSM_OK if successful, or
** an LSM error code otherwise.
*/
int lsmReadlock(lsm_db *db, i64 iLsm, i64 iTree){
  ShmHeader *pShm = db->pShmhdr;
  int i;
  int rc = LSM_OK;

  assert( db->iReader<0 );

  /* Search for an exact match. */







|







785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
#endif

/*
** Obtain a read-lock on database version identified by the combination
** of snapshot iLsm and tree iTree. Return LSM_OK if successful, or
** an LSM error code otherwise.
*/
int lsmReadlock(lsm_db *db, i64 iLsm, u32 iTree){
  ShmHeader *pShm = db->pShmhdr;
  int i;
  int rc = LSM_OK;

  assert( db->iReader<0 );

  /* Search for an exact match. */
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845











846
847
848
849
850
851
852
853



854
855
856
857

858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
      if( rc==LSM_OK ) db->iReader = i;
    }
  }

  /* Search for any usable slot */
  for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
    if( p->iLsmId && p->iTreeId && p->iLsmId<=iLsm && p->iTreeId<=iTree ){
      rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
      if( rc==LSM_OK ){
        if( p->iLsmId && p->iTreeId && p->iLsmId<=iLsm && p->iTreeId<=iTree ){
          db->iReader = i;
        }
      }else if( rc==LSM_BUSY ){
        rc = LSM_OK;
      }
    }
  }

  return rc;
}












static int isInUse(lsm_db *db, i64 iLsm, i64 iTree, int *pbInUse){
  ShmHeader *pShm = db->pShmhdr;
  int i;
  int rc = LSM_OK;

  for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
    if( p->iLsmId && p->iTreeId && (p->iTreeId<=iTree || p->iLsmId<=iLsm) ){



      rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
      if( rc==LSM_OK ){
        p->iTreeId = p->iLsmId = 0;
        lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);

      }
    }
  }

  if( rc==LSM_BUSY ){
    *pbInUse = 1;
    return LSM_OK;
  }
  *pbInUse = 0;
  return rc;
}

int lsmTreeInUse(lsm_db *db, u32 iTreeId, int *pbInUse){
  if( db->treehdr.iTreeId==iTreeId ){
    *pbInUse = 1;
    return LSM_OK;
  }
  return isInUse(db, 0, (i64)iTreeId, pbInUse);
}

int lsmLsmInUse(lsm_db *db, i64 iLsmId, int *pbInUse){
  if( db->pClient && db->pClient->iId<=iLsmId ){
    *pbInUse = 1;
    return LSM_OK;
  }







|


|











>
>
>
>
>
>
>
>
>
>
>
|






|
>
>
>
|
|
|
|
>












|
|



|







823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
      if( rc==LSM_OK ) db->iReader = i;
    }
  }

  /* Search for any usable slot */
  for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
    if( p->iLsmId && p->iLsmId<=iLsm && shm_sequence_ge(iTree, p->iTreeId) ){
      rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
      if( rc==LSM_OK ){
        if( p->iLsmId && p->iLsmId<=iLsm && shm_sequence_ge(iTree,p->iTreeId) ){
          db->iReader = i;
        }
      }else if( rc==LSM_BUSY ){
        rc = LSM_OK;
      }
    }
  }

  return rc;
}

/*
** This is used to check if there exists a read-lock locking a particular
** version of either the in-memory tree or database file. 
**
** If iLsmId is non-zero, then it is a snapshot id. If there exists a 
** read-lock using this snapshot or newer, set *pbInUse to true. Or,
** if there is no such read-lock, set it to false.
**
** Or, if iLsmId is zero, then iShmid is a shared-memory sequence id.
** Search for a read-lock using this sequence id or newer. etc.
*/
static int isInUse(lsm_db *db, i64 iLsmId, u32 iShmid, int *pbInUse){
  ShmHeader *pShm = db->pShmhdr;
  int i;
  int rc = LSM_OK;

  for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
    if( p->iLsmId ){
      if( (iLsmId!=0 && iLsmId>=p->iLsmId) 
       || (iLsmId==0 && shm_sequence_ge(p->iTreeId, iShmid))
      ){
        rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
        if( rc==LSM_OK ){
          p->iLsmId = 0;
          lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
        }
      }
    }
  }

  if( rc==LSM_BUSY ){
    *pbInUse = 1;
    return LSM_OK;
  }
  *pbInUse = 0;
  return rc;
}

int lsmTreeInUse(lsm_db *db, u32 iShmid, int *pbInUse){
  if( db->treehdr.iUsedShmid==iShmid ){
    *pbInUse = 1;
    return LSM_OK;
  }
  return isInUse(db, 0, iShmid, pbInUse);
}

int lsmLsmInUse(lsm_db *db, i64 iLsmId, int *pbInUse){
  if( db->pClient && db->pClient->iId<=iLsmId ){
    *pbInUse = 1;
    return LSM_OK;
  }
Changes to src/lsm_tree.c.
270
271
272
273
274
275
276




277
278
279
280
281
282
283
  return 0;
}

static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){
  int rcdummy = LSM_OK;
  return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, &rcdummy);
}





/* Values for the third argument to treeShmkey(). */
#define TK_LOADKEY  1
#define TK_LOADVAL  2

static TreeKey *treeShmkey(
  lsm_db *pDb,                    /* Database handle */







>
>
>
>







270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
  return 0;
}

static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){
  int rcdummy = LSM_OK;
  return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, &rcdummy);
}

static ShmChunk * treeShmChunkRc(lsm_db *pDb, int iChunk, int *pRc){
  return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, pRc);
}

/* Values for the third argument to treeShmkey(). */
#define TK_LOADKEY  1
#define TK_LOADVAL  2

static TreeKey *treeShmkey(
  lsm_db *pDb,                    /* Database handle */
529
530
531
532
533
534
535

536
537


538
539
540
541

542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559










560
561
562
563
564
565
566
567
568
569
570
    assert( iWrite );
    iChunk = treeOffsetToChunk(iWrite-1);
    iEof = (iChunk+1) * CHUNK_SIZE;
    assert( iEof>=iWrite && (iEof-iWrite)<CHUNK_SIZE );
    if( (iWrite+nByte)>iEof ){
      ShmChunk *pHdr;           /* Header of chunk just finished (iChunk) */
      ShmChunk *pFirst;         /* Header of chunk treehdr.iFirst */

      int iNext = 0;            /* Next chunk */
      int rc;



      /* Check if the chunk at the start of the linked list is still in
      ** use. If not, reuse it. If so, allocate a new chunk by appending
      ** to the *-shm file.  */

      if( pDb->treehdr.iFirst!=iChunk ){
        int bInUse;
        pFirst = treeShmChunk(pDb, pDb->treehdr.iFirst);
        rc = lsmTreeInUse(pDb, pFirst->iLastTree, &bInUse);
        if( rc!=LSM_OK ){
          *pRc = rc;
          return 0;
        }
        if( bInUse==0 ){
          iNext = pDb->treehdr.iFirst;
          pDb->treehdr.iFirst = pFirst->iNext;
          pFirst->iNext = 0;
          pFirst->iLastTree = 0;
          assert( pDb->treehdr.iFirst );
          assert( pFirst->iLastTree<pDb->treehdr.iTreeId );
        }
      }
      if( iNext==0 ) iNext = pDb->treehdr.nChunk++;











      /* Set the header values for the chunk just finished */
      pHdr = (ShmChunk *)treeShmptr(pDb, iChunk*CHUNK_SIZE, pRc);
      pHdr->iLastTree = pDb->treehdr.iTreeId;
      pHdr->iNext = iNext;

      /* Advance to the next chunk */
      iWrite = iNext * CHUNK_SIZE + CHUNK_HDR;
    }

    /* Allocate space at iWrite. */







>

|
>
>




>
|

<
|







<
<

<



>
>
>
>
>
>
>
>
>
>



<







533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551

552
553
554
555
556
557
558
559


560

561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576

577
578
579
580
581
582
583
    assert( iWrite );
    iChunk = treeOffsetToChunk(iWrite-1);
    iEof = (iChunk+1) * CHUNK_SIZE;
    assert( iEof>=iWrite && (iEof-iWrite)<CHUNK_SIZE );
    if( (iWrite+nByte)>iEof ){
      ShmChunk *pHdr;           /* Header of chunk just finished (iChunk) */
      ShmChunk *pFirst;         /* Header of chunk treehdr.iFirst */
      ShmChunk *pNext;          /* Header of new chunk */
      int iNext = 0;            /* Next chunk */
      int rc = LSM_OK;

      pFirst = treeShmChunk(pDb, pDb->treehdr.iFirst);

      /* Check if the chunk at the start of the linked list is still in
      ** use. If not, reuse it. If so, allocate a new chunk by appending
      ** to the *-shm file.  */
      assert( shm_sequence_ge(pDb->treehdr.iUsedShmid, pFirst->iShmid) );
      if( pDb->treehdr.iUsedShmid!=pFirst->iShmid ){
        int bInUse;

        rc = lsmTreeInUse(pDb, pFirst->iShmid, &bInUse);
        if( rc!=LSM_OK ){
          *pRc = rc;
          return 0;
        }
        if( bInUse==0 ){
          iNext = pDb->treehdr.iFirst;
          pDb->treehdr.iFirst = pFirst->iNext;


          assert( pDb->treehdr.iFirst );

        }
      }
      if( iNext==0 ) iNext = pDb->treehdr.nChunk++;

      /* Set the header values for the new chunk */
      pNext = treeShmChunkRc(pDb, iNext, &rc);
      if( pNext ){
        pNext->iNext = 0;
        pNext->iShmid = (pDb->treehdr.iNextShmid++);
      }else{
        *pRc = rc;
        return 0;
      }

      /* Set the header values for the chunk just finished */
      pHdr = (ShmChunk *)treeShmptr(pDb, iChunk*CHUNK_SIZE, pRc);

      pHdr->iNext = iNext;

      /* Advance to the next chunk */
      iWrite = iNext * CHUNK_SIZE + CHUNK_HDR;
    }

    /* Allocate space at iWrite. */
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965



966
967
968
969
970








971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
  return rc;
}

/*
** Empty the contents of the in-memory tree.
*/
void lsmTreeClear(lsm_db *pDb){
  pDb->treehdr.iTreeId++;
  pDb->treehdr.iTransId = 1;
  pDb->treehdr.iRoot = 0;
  pDb->treehdr.nHeight = 0;
  pDb->treehdr.nByte = 0;
}

/*
** This function is called during recovery to initialize the 
** tree header. Only the database connections private copy of the tree-header
** is initialized here - it will be copied into shared memory if log file
** recovery is successful.
*/
void lsmTreeInit(lsm_db *pDb){



  pDb->treehdr.iTransId = 1;
  pDb->treehdr.iFirst = 1;
  pDb->treehdr.nChunk = 2;
  pDb->treehdr.iWrite = LSM_SHM_CHUNK_SIZE + LSM_SHM_CHUNK_HDR;
  pDb->treehdr.iTreeId = 1;








}

/*
** Insert a new entry into the in-memory tree.
**
** If the value of the 5th parameter, nVal, is negative, then a delete-marker
** is inserted into the tree. In this case the value pointer, pVal, must be
** NULL.
*/
int lsmTreeInsert(
  lsm_db *pDb,                    /* Database handle */
  void *pKey,                     /* Pointer to key data */
  int nKey,                       /* Size of key data in bytes */
  void *pVal,                     /* Pointer to value data (or NULL) */
  int nVal                        /* Bytes in value data (or -ve for delete) */
){
  int rc = LSM_OK;                /* Return Code */
  TreeKey *pTreeKey;              /* New key-value being inserted */
  int nTreeKey;                   /* Number of bytes allocated at pTreeKey */
  u32 iTreeKey;
  u8 *a;
  TreeHeader *pHdr = &pDb->treehdr;

  assert( nVal>=0 || pVal==0 );
  assert_tree_looks_ok(LSM_OK, pTree);
#if 0
  dump_tree_contents(pDb, "before");
#endif







<












|
>
>
>




|
>
>
>
>
>
>
>
>


















<

<







958
959
960
961
962
963
964

965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011

1012

1013
1014
1015
1016
1017
1018
1019
  return rc;
}

/*
** Empty the contents of the in-memory tree.
*/
void lsmTreeClear(lsm_db *pDb){

  pDb->treehdr.iTransId = 1;
  pDb->treehdr.iRoot = 0;
  pDb->treehdr.nHeight = 0;
  pDb->treehdr.nByte = 0;
}

/*
** This function is called during recovery to initialize the 
** tree header. Only the database connections private copy of the tree-header
** is initialized here - it will be copied into shared memory if log file
** recovery is successful.
*/
int lsmTreeInit(lsm_db *pDb){
  ShmChunk *pOne;
  int rc = LSM_OK;

  pDb->treehdr.iTransId = 1;
  pDb->treehdr.iFirst = 1;
  pDb->treehdr.nChunk = 2;
  pDb->treehdr.iWrite = LSM_SHM_CHUNK_SIZE + LSM_SHM_CHUNK_HDR;
  pDb->treehdr.iNextShmid = 2;
  pDb->treehdr.iUsedShmid = 1;

  pOne = treeShmChunkRc(pDb, 1, &rc);
  if( pOne ){
    pOne->iNext = 0;
    pOne->iShmid = 1;
  }
  return rc;
}

/*
** Insert a new entry into the in-memory tree.
**
** If the value of the 5th parameter, nVal, is negative, then a delete-marker
** is inserted into the tree. In this case the value pointer, pVal, must be
** NULL.
*/
int lsmTreeInsert(
  lsm_db *pDb,                    /* Database handle */
  void *pKey,                     /* Pointer to key data */
  int nKey,                       /* Size of key data in bytes */
  void *pVal,                     /* Pointer to value data (or NULL) */
  int nVal                        /* Bytes in value data (or -ve for delete) */
){
  int rc = LSM_OK;                /* Return Code */
  TreeKey *pTreeKey;              /* New key-value being inserted */

  u32 iTreeKey;

  TreeHeader *pHdr = &pDb->treehdr;

  assert( nVal>=0 || pVal==0 );
  assert_tree_looks_ok(LSM_OK, pTree);
#if 0
  dump_tree_contents(pDb, "before");
#endif
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466

1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491


1492
1493
1494

1495
1496
1497

1498
1499
1500
1501
1502
1503
1504
** contents back to their current state.
*/
void lsmTreeMark(lsm_db *pDb, TreeMark *pMark){
  pMark->iRoot = pDb->treehdr.iRoot;
  pMark->nHeight = pDb->treehdr.nHeight;
  pMark->iWrite = pDb->treehdr.iWrite;
  pMark->nChunk = pDb->treehdr.nChunk;
  pMark->iFirst = pDb->treehdr.iFirst;
  pMark->iRollback = intArraySize(&pDb->rollback);
}

/*
** Roll back to mark pMark. Structure *pMark should have been previously
** populated by a call to lsmTreeMark().
*/
void lsmTreeRollback(lsm_db *pDb, TreeMark *pMark){
  int rcdummy = LSM_OK;
  int iIdx;
  int nIdx;
  u32 iNext;
  ShmChunk *pChunk;
  u32 iChunk;


  /* Revert all required v2 pointers. */
  nIdx = intArraySize(&pDb->rollback);
  for(iIdx = pMark->iRollback; iIdx<nIdx; iIdx++){
    TreeNode *pNode;
    pNode = treeShmptr(pDb, intArrayEntry(&pDb->rollback, iIdx), &rcdummy);
    assert( pNode && rcdummy==LSM_OK );
    pNode->iV2 = 0;
    pNode->iV2Child = 0;
    pNode->iV2Ptr = 0;
  }
  intArrayTruncate(&pDb->rollback, pMark->iRollback);

  /* Restore the free-chunk list */
  assert( pMark->iWrite!=0 );
  iChunk = treeOffsetToChunk(pMark->iWrite-1);
  pChunk = treeShmChunk(pDb, iChunk);
  iNext = pChunk->iNext;
  pChunk->iNext = 0;
  assert( iNext==0 
       || pDb->treehdr.iFirst==pMark->iFirst 
       || iNext==pMark->iFirst 
  );
  pDb->treehdr.iFirst = pMark->iFirst;
  while( iNext ){


    iChunk = iNext;
    pChunk = treeShmChunk(pDb, iChunk);
    iNext = pChunk->iNext;

    if( iChunk<pMark->nChunk ){
      pChunk->iNext = pDb->treehdr.iFirst;
      pChunk->iLastTree = 0;

    }
  }

  /* Restore the tree-header fields */
  pDb->treehdr.iRoot = pMark->iRoot;
  pDb->treehdr.nHeight = pMark->nHeight;
  pDb->treehdr.iWrite = pMark->iWrite;







<














>













|





|
|
|
|
<

>
>
|
|
|
>
|
|
|
>







1466
1467
1468
1469
1470
1471
1472

1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510

1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
** contents back to their current state.
*/
void lsmTreeMark(lsm_db *pDb, TreeMark *pMark){
  pMark->iRoot = pDb->treehdr.iRoot;
  pMark->nHeight = pDb->treehdr.nHeight;
  pMark->iWrite = pDb->treehdr.iWrite;
  pMark->nChunk = pDb->treehdr.nChunk;

  pMark->iRollback = intArraySize(&pDb->rollback);
}

/*
** Roll back to mark pMark. Structure *pMark should have been previously
** populated by a call to lsmTreeMark().
*/
void lsmTreeRollback(lsm_db *pDb, TreeMark *pMark){
  int rcdummy = LSM_OK;
  int iIdx;
  int nIdx;
  u32 iNext;
  ShmChunk *pChunk;
  u32 iChunk;
  u32 iShmid;

  /* Revert all required v2 pointers. */
  nIdx = intArraySize(&pDb->rollback);
  for(iIdx = pMark->iRollback; iIdx<nIdx; iIdx++){
    TreeNode *pNode;
    pNode = treeShmptr(pDb, intArrayEntry(&pDb->rollback, iIdx), &rcdummy);
    assert( pNode && rcdummy==LSM_OK );
    pNode->iV2 = 0;
    pNode->iV2Child = 0;
    pNode->iV2Ptr = 0;
  }
  intArrayTruncate(&pDb->rollback, pMark->iRollback);

  /* Restore the free-chunk list. */
  assert( pMark->iWrite!=0 );
  iChunk = treeOffsetToChunk(pMark->iWrite-1);
  pChunk = treeShmChunk(pDb, iChunk);
  iNext = pChunk->iNext;
  pChunk->iNext = 0;

  pChunk = treeShmChunk(pDb, pDb->treehdr.iFirst);
  iShmid = pChunk->iShmid-1;


  while( iNext ){
    u32 iFree = iNext;            /* Current chunk being rollback-freed */
    ShmChunk *pFree;              /* Pointer to chunk iFree */

    pFree = treeShmChunk(pDb, iFree);
    iNext = pFree->iNext;

    if( iFree<pMark->nChunk ){
      pFree->iNext = pDb->treehdr.iFirst;
      pFree->iShmid = iShmid--;
      pDb->treehdr.iFirst = iFree;
    }
  }

  /* Restore the tree-header fields */
  pDb->treehdr.iRoot = pMark->iRoot;
  pDb->treehdr.nHeight = pMark->nHeight;
  pDb->treehdr.iWrite = pMark->iWrite;