SQLite4
Check-in [dc91e55841]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix some of the problems with very large free-block lists.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | multi-process
Files: files | file ages | folders
SHA1: dc91e55841067ecf7dcfa49a7a45b3487dd3c300
User & Date: dan 2012-08-29 19:45:14
Context
2012-08-30
18:01
Fix memory leaks. Add the LSM_CONFIG_MAX_FREELIST parameter to make testing free-list overflow easier. check-in: 3e1ecb95c9 user: dan tags: multi-process
2012-08-29
19:45
Fix some of the problems with very large free-block lists. check-in: dc91e55841 user: dan tags: multi-process
2012-08-28
19:06
Limit the number of levels that may exist within the database file. check-in: 77b1401c13 user: dan tags: multi-process
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/kvlsm.c.

438
439
440
441
442
443
444







445
446
447
448
449
450









451
452
453
454
455
456
457
458
459
460
461
462
463
  KVLsm *pNew;
  int rc = SQLITE4_OK;

  pNew = (KVLsm *)sqlite4_malloc(pEnv, sizeof(KVLsm));
  if( pNew==0 ){
    rc = SQLITE4_NOMEM;
  }else{







    memset(pNew, 0, sizeof(KVLsm));
    pNew->base.pStoreVfunc = &kvlsmMethods;
    pNew->base.pEnv = pEnv;

    rc = lsm_new(0, &pNew->pDb);
    if( rc==SQLITE4_OK ){









      rc = lsm_open(pNew->pDb, zName);
    }

    if( rc!=SQLITE4_OK ){
      lsm_close(pNew->pDb);
      sqlite4_free(pEnv, pNew);
      pNew = 0;
    }
  }

  *ppKVStore = (KVStore*)pNew;
  return rc;
}







>
>
>
>
>
>
>



<


>
>
>
>
>
>
>
>
>













438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454

455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
  KVLsm *pNew;
  int rc = SQLITE4_OK;

  pNew = (KVLsm *)sqlite4_malloc(pEnv, sizeof(KVLsm));
  if( pNew==0 ){
    rc = SQLITE4_NOMEM;
  }else{
    struct Config {
      const char *zParam;
      int eParam;
    } aConfig[] = {
      { "lsm_block_size", LSM_CONFIG_BLOCK_SIZE }
    };

    memset(pNew, 0, sizeof(KVLsm));
    pNew->base.pStoreVfunc = &kvlsmMethods;
    pNew->base.pEnv = pEnv;

    rc = lsm_new(0, &pNew->pDb);
    if( rc==SQLITE4_OK ){
      int i;
      for(i=0; i<ArraySize(aConfig); i++){
        const char *zVal = sqlite4_uri_parameter(zName, aConfig[i].zParam);
        if( zVal ){
          int nVal = sqlite4Atoi(zVal);
          lsm_config(pNew->pDb, aConfig[i].eParam, &nVal);
        }
      }

      rc = lsm_open(pNew->pDb, zName);
    }

    if( rc!=SQLITE4_OK ){
      lsm_close(pNew->pDb);
      sqlite4_free(pEnv, pNew);
      pNew = 0;
    }
  }

  *ppKVStore = (KVStore*)pNew;
  return rc;
}

Changes to src/lsmInt.h.

445
446
447
448
449
450
451


452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
...
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
...
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
...
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
...
756
757
758
759
760
761
762


763
764
765
766
767
768
769
...
781
782
783
784
785
786
787

788
789
790
791
792
793
794
  Level *pLevel;                  /* Pointer to level 0 of snapshot (or NULL) */
  i64 iId;                        /* Snapshot id */

  /* Used by worker snapshots only */
  int nBlock;                     /* Number of blocks in database file */
  u32 aiAppend[LSM_APPLIST_SZ];   /* Append point list */
  Freelist freelist;              /* Free block list */


  int nFreelistDelta;
  int bRecordDelta;
};
#define LSM_INITIAL_SNAPSHOT_ID 11

/*
** Functions from file "lsm_ckpt.c".
*/
int lsmCheckpointWrite(lsm_db *);
int lsmCheckpointExport(lsm_db *, int, int, int, i64, int, void **, int *);
int lsmCheckpointLevels(lsm_db *, int, void **, int *);
int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal);
int lsmCheckpointOverflow(lsm_db *pDb, int *pnLsmLevel);

int lsmCheckpointRecover(lsm_db *);
int lsmCheckpointDeserialize(lsm_db *, u32 *, Snapshot **);

int lsmCheckpointLoad(lsm_db *pDb);
int lsmCheckpointLoadWorker(lsm_db *pDb);
int lsmCheckpointStore(lsm_db *pDb, int);

i64 lsmCheckpointId(u32 *, int);
i64 lsmCheckpointLogOffset(u32 *);
int lsmCheckpointPgsz(u32 *);
int lsmCheckpointBlksz(u32 *);
void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog);
void lsmCheckpointZeroLogoffset(lsm_db *);

int lsmCheckpointSaveWorker(lsm_db *pDb, int);
int lsmDatabaseFull(lsm_db *pDb);


/* 
** Functions from file "lsm_tree.c".
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
................................................................................
int lsmFsNRead(FileSystem *);
int lsmFsNWrite(FileSystem *);

int lsmFsMetaPageGet(FileSystem *, int, int, MetaPage **);
int lsmFsMetaPageRelease(MetaPage *);
u8 *lsmFsMetaPageData(MetaPage *, int *);

#ifdef LSM_EXPENSIVE_DEBUG
int lsmFsIntegrityCheck(lsm_db *);
#else
/* # define lsmFsIntegrityCheck(pDb) 1 */
#endif

int lsmFsPageWritable(Page *);

/* Functions to read, write and sync the log file. */
int lsmFsWriteLog(FileSystem *pFS, i64 iOff, LsmString *pStr);
int lsmFsSyncLog(FileSystem *pFS);
................................................................................
** End of functions from "lsm_file.c".
**************************************************************************/

/* 
** Functions from file "lsm_sorted.c".
*/
int lsmInfoPageDump(lsm_db *, Pgno, int, char **);
int lsmSortedFlushTree(lsm_db *, int, int);
void lsmSortedCleanup(lsm_db *);
int lsmSortedAutoWork(lsm_db *, int nUnit);

void lsmSortedRemap(lsm_db *pDb);

void lsmSortedFreeLevel(lsm_env *pEnv, Level *);

int lsmSortedFlushDb(lsm_db *);
int lsmSortedAdvanceAll(lsm_db *pDb);

int lsmSortedLoadMerge(lsm_db *, Level *, u32 *, int *);

int lsmSortedLoadSystem(lsm_db *pDb);

void *lsmSortedSplitKey(Level *pLevel, int *pnByte);

void lsmSortedSaveTreeCursors(lsm_db *);

int lsmMCursorNew(lsm_db *, MultiCursor **);
void lsmMCursorClose(MultiCursor *);
................................................................................

int lsmBeginRecovery(lsm_db *);
int lsmBeginReadTrans(lsm_db *);
int lsmBeginWriteTrans(lsm_db *);
int lsmBeginFlush(lsm_db *);

int lsmBeginWork(lsm_db *);
void lsmFinishWork(lsm_db *, int, int *);

int lsmFinishRecovery(lsm_db *);
void lsmFinishReadTrans(lsm_db *);
int lsmFinishWriteTrans(lsm_db *, int);
int lsmFinishFlush(lsm_db *, int);

int lsmDbUpdateClient(lsm_db *, int, int);

int lsmSnapshotFreelist(lsm_db *, int **, int *);
int lsmSnapshotSetFreelist(lsm_db *, int *, int);

Snapshot *lsmDbSnapshotClient(lsm_db *);
Snapshot *lsmDbSnapshotWorker(lsm_db *);
void lsmDbSnapshotRelease(lsm_env *pEnv, Snapshot *);

................................................................................
void lsmFreelistDeltaEnd(lsm_db *);
int lsmFreelistDelta(lsm_db *pDb);

DbLog *lsmDatabaseLog(lsm_db *pDb);

#ifdef LSM_DEBUG
  int lsmHoldingClientMutex(lsm_db *pDb);


#endif

void lsmFreeSnapshot(lsm_env *, Snapshot *);


/* Candidate values for the 3rd argument to lsmShmLock() */
#define LSM_LOCK_UNLOCK 0
................................................................................
#endif

int lsmReadlock(lsm_db *, i64 iLsm, i64 iTree);
int lsmReleaseReadlock(lsm_db *);

int lsmLsmInUse(lsm_db *db, i64 iLsmId, int *pbInUse);
int lsmTreeInUse(lsm_db *db, u32 iLsmId, int *pbInUse);



/**************************************************************************
** functions in lsm_str.c
*/
void lsmStringInit(LsmString*, lsm_env *pEnv);
int lsmStringExtend(LsmString*, int);







>
>









|


|


|












|







 







|

<
<







 







|











<
|







 







|






<
<







 







>
>







 







>







445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
...
604
605
606
607
608
609
610
611
612


613
614
615
616
617
618
619
...
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650

651
652
653
654
655
656
657
658
...
719
720
721
722
723
724
725
726
727
728
729
730
731
732


733
734
735
736
737
738
739
...
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
...
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
  Level *pLevel;                  /* Pointer to level 0 of snapshot (or NULL) */
  i64 iId;                        /* Snapshot id */

  /* Used by worker snapshots only */
  int nBlock;                     /* Number of blocks in database file */
  u32 aiAppend[LSM_APPLIST_SZ];   /* Append point list */
  Freelist freelist;              /* Free block list */
  int nFreelistOvfl;              /* Number of extra free-list entries in LSM */

  int nFreelistDelta;
  int bRecordDelta;
};
#define LSM_INITIAL_SNAPSHOT_ID 11

/*
** Functions from file "lsm_ckpt.c".
*/
int lsmCheckpointWrite(lsm_db *);
int lsmCheckpointExport(lsm_db *, int, int, i64, int, void **, int *);
int lsmCheckpointLevels(lsm_db *, int, void **, int *);
int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal);
int lsmCheckpointOverflow(lsm_db *pDb, void **, int *, int *);

int lsmCheckpointRecover(lsm_db *);
int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **);

int lsmCheckpointLoad(lsm_db *pDb);
int lsmCheckpointLoadWorker(lsm_db *pDb);
int lsmCheckpointStore(lsm_db *pDb, int);

i64 lsmCheckpointId(u32 *, int);
i64 lsmCheckpointLogOffset(u32 *);
int lsmCheckpointPgsz(u32 *);
int lsmCheckpointBlksz(u32 *);
void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog);
void lsmCheckpointZeroLogoffset(lsm_db *);

int lsmCheckpointSaveWorker(lsm_db *pDb, int, int);
int lsmDatabaseFull(lsm_db *pDb);


/* 
** Functions from file "lsm_tree.c".
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
................................................................................
int lsmFsNRead(FileSystem *);
int lsmFsNWrite(FileSystem *);

int lsmFsMetaPageGet(FileSystem *, int, int, MetaPage **);
int lsmFsMetaPageRelease(MetaPage *);
u8 *lsmFsMetaPageData(MetaPage *, int *);

#ifdef LSM_DEBUG
int lsmFsIntegrityCheck(lsm_db *);


#endif

int lsmFsPageWritable(Page *);

/* Functions to read, write and sync the log file. */
int lsmFsWriteLog(FileSystem *pFS, i64 iOff, LsmString *pStr);
int lsmFsSyncLog(FileSystem *pFS);
................................................................................
** End of functions from "lsm_file.c".
**************************************************************************/

/* 
** Functions from file "lsm_sorted.c".
*/
int lsmInfoPageDump(lsm_db *, Pgno, int, char **);
int lsmSortedFlushTree(lsm_db *, int *);
void lsmSortedCleanup(lsm_db *);
int lsmSortedAutoWork(lsm_db *, int nUnit);

void lsmSortedRemap(lsm_db *pDb);

void lsmSortedFreeLevel(lsm_env *pEnv, Level *);

int lsmSortedFlushDb(lsm_db *);
int lsmSortedAdvanceAll(lsm_db *pDb);

int lsmSortedLoadMerge(lsm_db *, Level *, u32 *, int *);

int lsmSortedLoadFreelist(lsm_db *pDb, void **, int *);

void *lsmSortedSplitKey(Level *pLevel, int *pnByte);

void lsmSortedSaveTreeCursors(lsm_db *);

int lsmMCursorNew(lsm_db *, MultiCursor **);
void lsmMCursorClose(MultiCursor *);
................................................................................

int lsmBeginRecovery(lsm_db *);
int lsmBeginReadTrans(lsm_db *);
int lsmBeginWriteTrans(lsm_db *);
int lsmBeginFlush(lsm_db *);

int lsmBeginWork(lsm_db *);
void lsmFinishWork(lsm_db *, int, int, int *);

int lsmFinishRecovery(lsm_db *);
void lsmFinishReadTrans(lsm_db *);
int lsmFinishWriteTrans(lsm_db *, int);
int lsmFinishFlush(lsm_db *, int);



int lsmSnapshotFreelist(lsm_db *, int **, int *);
int lsmSnapshotSetFreelist(lsm_db *, int *, int);

Snapshot *lsmDbSnapshotClient(lsm_db *);
Snapshot *lsmDbSnapshotWorker(lsm_db *);
void lsmDbSnapshotRelease(lsm_env *pEnv, Snapshot *);

................................................................................
void lsmFreelistDeltaEnd(lsm_db *);
int lsmFreelistDelta(lsm_db *pDb);

DbLog *lsmDatabaseLog(lsm_db *pDb);

#ifdef LSM_DEBUG
  int lsmHoldingClientMutex(lsm_db *pDb);
  int lsmShmAssertLock(lsm_db *db, int iLock, int eOp);
  int lsmShmAssertWorker(lsm_db *db);
#endif

void lsmFreeSnapshot(lsm_env *, Snapshot *);


/* Candidate values for the 3rd argument to lsmShmLock() */
#define LSM_LOCK_UNLOCK 0
................................................................................
#endif

int lsmReadlock(lsm_db *, i64 iLsm, i64 iTree);
int lsmReleaseReadlock(lsm_db *);

int lsmLsmInUse(lsm_db *db, i64 iLsmId, int *pbInUse);
int lsmTreeInUse(lsm_db *db, u32 iLsmId, int *pbInUse);
int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId);


/**************************************************************************
** functions in lsm_str.c
*/
void lsmStringInit(LsmString*, lsm_env *pEnv);
int lsmStringExtend(LsmString*, int);

Changes to src/lsm_ckpt.c.

32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
..
85
86
87
88
89
90
91


92
93
94














95
96
97
98
99

100
101
102
103
104





105
106
107
108
109
110
111




112
113
114
115




116
117


118

119
120
121
122
123
124
125
...
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
...
354
355
356
357
358
359
360
361
362
363
364
365
366





367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
...
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
...
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691





















692
693
694
695
696
697
698
...
887
888
889
890
891
892
893

894
895
896
897
898
899
900
901
902
903

904







905
906
907
908

909
910
911
912
913
914
915
...
924
925
926
927
928
929
930


931
932
933
934
935
936
937
938

939
940
941
942
943

944
945
946
947
948
949
950
...
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
**     2. The checkpoint id LSW.
**     3. The number of integer values in the entire checkpoint, including 
**        the two checksum values.
**     4. The total number of blocks in the database.
**     5. The block size.
**     6. The number of levels.
**     7. The nominal database page size.
**     8. Flag indicating if overflow records are used. If true, the top-level
**        segment contains LEVELS and FREELIST entries. 
**
**   Log pointer:
**
**     4 integers (2 for a 64-bit offset and 2 for a 64-bit checksum). See 
**     ckptExportLog() and ckptImportLog().
**
**   Append points:
................................................................................
**     1. First page of array,
**     2. Last page of array,
**     3. Root page of array (or 0),
**     4. Size of array in pages,
*/

/*


** A limit on the number of rhs segments that may be present in the database
** file. Defining this limit ensures that all level records fit within
** the 4096 byte limit for checkpoint blobs.














*/
#define LSM_CKPT_MAX_LEVELS 40

/*
** OVERSIZED CHECKPOINT BLOBS:

**
** There are two slots allocated for checkpoints at the start of each
** database file. Each are 4096 bytes in size, so may accommodate
** checkpoints that consist of up to 1024 32-bit integers. Normally,
** this is enough.





**
** However, if a database contains a sufficiently large number of levels,
** a checkpoint may exceed 1024 integers in size. In most circumstances this 
** is an undesirable scenario, as a database with so many levels will be 
** slow to query. If this does happen, then only the uppermost (more recent)
** levels are stored in the checkpoint blob itself. The remainder are stored
** in an LSM record with the system key "LEVELS". The payload of the entry




** is a series of 32-bit big-endian integers, as follows:
**
**    1. Number of levels (store in the LEVELS record, not total).
**    2. For each level, a "level record" (as desribed above).




**
** There is no checksum in the LEVELS record.


*/


/*
** The argument to this macro must be of type u32. On a little-endian
** architecture, it returns the u32 value that results from interpreting
** the 4 bytes as a big-endian value. On a big-endian architecture, it
** returns the value that would be produced by intepreting the 4 bytes
** of the input value as a little-endian integer.
................................................................................
    ckptSetValue(p, iOut++, aiAppend[i], pRc);
  }
  *piOut = iOut;
};

int lsmCheckpointExport( 
  lsm_db *pDb,                    /* Connection handle */
  int nLsmLevel,                  /* Number of levels to store in LSM */
  int bOvfl,                      /* True if free list is stored in LSM */
  int bLog,                       /* True to update log-offset fields */
  i64 iId,                        /* Checkpoint id */
  int bCksum,                     /* If true, include checksums */
  void **ppCkpt,                  /* OUT: Buffer containing checkpoint */
  int *pnCkpt                     /* OUT: Size of checkpoint in bytes */
){
  int rc = LSM_OK;                /* Return Code */
................................................................................
  int nHdrLevel = 0;              /* Number of levels in checkpoint */
  int iLevel;                     /* Used to count out nHdrLevel levels */
  int iOut = 0;                   /* Current offset in aCkpt[] */
  Level *pLevel;                  /* Level iterator */
  int i;                          /* Iterator used while serializing freelist */
  CkptBuffer ckpt;

  assert( bOvfl || nLsmLevel==0 );
  
  /* Initialize the output buffer */
  memset(&ckpt, 0, sizeof(CkptBuffer));
  ckpt.pEnv = pDb->pEnv;
  iOut = CKPT_HDR_SIZE;






  /* Write the log offset into the checkpoint. */
  ckptExportLog(pDb, bLog, &ckpt, &iOut, &rc);

  /* Write the append-point list */
  ckptExportAppendlist(pDb, &ckpt, &iOut, &rc);

  /* Figure out how many levels will be written to the checkpoint. */
  for(pLevel=lsmDbSnapshotLevel(pSnap); pLevel; pLevel=pLevel->pNext) nAll++;
  nHdrLevel = nAll - nLsmLevel;
  assert( nHdrLevel>0 );

  /* Serialize nHdrLevel levels. */
  iLevel = 0;
  for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nHdrLevel; pLevel=pLevel->pNext){
    ckptExportLevel(pLevel, &ckpt, &iOut, &rc);
    iLevel++;
  }

  /* Write the freelist delta (if bOvfl is true) or else the entire free-list
  ** (if bOvfl is false).  */
  assert( bOvfl==0 );             /* TODO: Fix this */
  if( rc==LSM_OK ){
    int nFree = pSnap->freelist.nEntry;
    ckptSetValue(&ckpt, iOut++, nFree, &rc);
    for(i=0; i<nFree; i++){
      FreelistEntry *p = &pSnap->freelist.aEntry[i];
      ckptSetValue(&ckpt, iOut++, p->iBlk, &rc);
      ckptSetValue(&ckpt, iOut++, (p->iId >> 32) & 0xFFFFFFFF, &rc);
      ckptSetValue(&ckpt, iOut++, p->iId & 0xFFFFFFFF, &rc);
    }
................................................................................
  ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nHdrLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_OVFL, bOvfl, &rc);

  if( bCksum ){
    ckptAddChecksum(&ckpt, iOut, &rc);
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);
    ckptSetValue(&ckpt, iOut+1, 0, &rc);
  }
................................................................................
    *paVal = 0;
  }

  return rc;
}

/*
** The function is used to determine if the FREELIST and LEVELS overflow
** records may be required if a new top level segment is written and a
** serialized checkpoint blob created. 
**
** If the checkpoint will definitely fit in a single meta page, 0 is 
** returned and *pnLsmLevel is set to 0. In this case the caller need not
** bother creating FREELIST and LEVELS records. 
**
** Or, if it is likely that the overflow records will be required, non-zero
** is returned.
*/
int lsmCheckpointOverflow(
  lsm_db *pDb,                    /* Database handle (must hold worker lock) */
  int *pnLsmLevel                 /* OUT: Number of levels to store in LSM */
){
  Level *p;                       /* Used to iterate through levels */
  int nFree;                      /* Free integers remaining in db header */
  int nList;                      /* Size of freelist in integers */
  int nLevel = 0;                 /* Number of levels stored in LEVELS */
 
  /* Number of free integers - 1024 less those used by the checkpoint header,
  ** less the 4 used for the log-pointer, less the 3 used for the free-list 
  ** delta and the 2 used for the checkpoint checksum. Value nFree is 
  ** therefore the total number of integers available to store the database 
  ** levels and freelist.  */
  nFree = 1024 - CKPT_HDR_SIZE - CKPT_LOGPTR_SIZE - CKPT_CKSUM_SIZE;
  nFree -= CKPT_APPENDLIST_SIZE;

  /* Allow space for the free-list  with LSM_CKPT_MIN_FREELIST entries */
  nFree--;
  nFree -= LSM_CKPT_MIN_FREELIST * 3;

  /* Allow space for the new level that may be created */
  nFree -= (2 + CKPT_SEGMENT_SIZE);

  /* Each level record not currently undergoing a merge consumes 2 + 4
  ** integers. Each level that is undergoing a merge consumes 2 + 4 +
  ** (nRhs * 4) + 1 + 1 + (nMerge * 2) + 2, where nRhs is the number of levels
  ** used as input to the merge and nMerge is the total number of segments
  ** (same as the number of levels, possibly plus 1 separators array). 
  **
  ** The calculation in the following block may overestimate the number
  ** of integers required by a single level by 2 (as it assumes 
  ** that nMerge==nRhs+1).  */
  for(p=lsmDbSnapshotLevel(pDb->pWorker); p; p=p->pNext){
    int nThis;                    /* Number of integers required by level p */
    if( p->pMerge ){
      nThis = 2 + (1 + p->nRight) * (2 + CKPT_SEGMENT_SIZE) + 1 + 1 + 2;
    }else{
      nThis = 2 + CKPT_SEGMENT_SIZE;
    }
    if( nFree<nThis ) break;
    nFree -= nThis;
  }

  /* Count the levels that will not fit in the checkpoint record. */
  while( p ){
    nLevel++;
    p = p->pNext;
  }
  *pnLsmLevel = nLevel;

  /* Set nList to the number of values required to store the free-list 
  ** completely in the checkpoint.  */
  nList = 1 + 3*pDb->pWorker->freelist.nEntry;
  nFree -= nList;

  return (nLevel>0 || nFree<0);





















}

/*
** Read the checkpoint id from meta-page pPg.
*/
static i64 ckptLoadId(MetaPage *pPg){
  i64 ret = 0;
................................................................................
      }
      return rc;
    }
  }
}

int lsmCheckpointLoadWorker(lsm_db *pDb){

  ShmHeader *pShm = pDb->pShmhdr;

  /* Must be holding the WORKER lock to do this */
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) );

  if( ckptChecksumOk(pShm->aWorker)==0 ){
    int nInt = (int)pShm->aClient[CKPT_HDR_NCKPT];
    memcpy(pShm->aWorker, pShm->aClient, nInt*sizeof(u32));
    if( ckptChecksumOk(pShm->aWorker)==0 ) return LSM_CORRUPT_BKPT;
  }

  return lsmCheckpointDeserialize(pDb, pShm->aWorker, &pDb->pWorker);







}

int lsmCheckpointDeserialize(
  lsm_db *pDb, 

  u32 *aCkpt, 
  Snapshot **ppSnap
){
  int rc = LSM_OK;
  Snapshot *pNew;

  pNew = (Snapshot *)lsmMallocZeroRc(pDb->pEnv, sizeof(Snapshot), &rc);
................................................................................
    rc = ckptLoadLevels(pDb, aCkpt, &iIn, nLevel, &pNew->pLevel);

    /* Make a copy of the append-list */
    nCopy = sizeof(u32) * LSM_APPLIST_SZ;
    memcpy(pNew->aiAppend, &aCkpt[CKPT_HDR_SIZE+CKPT_LOGPTR_SIZE], nCopy);

    /* Copy the free-list */


    nFree = aCkpt[iIn++];
    if( nFree ){
      pNew->freelist.aEntry = (FreelistEntry *)lsmMallocRc(
          pDb->pEnv, sizeof(FreelistEntry)*nFree, &rc
      );
      if( rc==LSM_OK ){
        int i;
        for(i=0; i<nFree; i++){

          pNew->freelist.aEntry[i].iBlk = aCkpt[iIn++];
          pNew->freelist.aEntry[i].iId = ((i64)(aCkpt[iIn])<<32) + aCkpt[iIn+1];
          iIn += 2;
        }
        pNew->freelist.nEntry = pNew->freelist.nAlloc = nFree;

      }
    }
  }

  if( rc!=LSM_OK ){
    lsmFreeSnapshot(pDb->pEnv, pNew);
    pNew = 0;
................................................................................
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) );
  assert( pDb->pWorker );

  for(p=pDb->pWorker->pLevel; p; p=p->pNext){
    nRhs += (p->nRight ? p->nRight : 1);
  }

  return (nRhs >= LSM_CKPT_MAX_LEVELS);
}

/*
** The connection passed as the only argument is currently the worker
** connection. Some work has been performed on the database by the connection,
** but no new snapshot has been written into shared memory.
**
** This function updates the shared-memory worker and client snapshots with
** the new snapshot produced by the work performed by pDb.
**
** If successful, LSM_OK is returned. Otherwise, if an error occurs, an LSM
** error code is returned.
*/
int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush){
  Snapshot *pSnap = pDb->pWorker;
  ShmHeader *pShm = pDb->pShmhdr;
  void *p = 0;
  int n = 0;
  int rc;

  rc = lsmCheckpointExport(pDb, 0, 0, bFlush, pSnap->iId+1, 1, &p, &n);
  if( rc!=LSM_OK ) return rc;
  assert( ckptChecksumOk((u32 *)p) );

  assert( n<=LSM_META_PAGE_SIZE );
  memcpy(pShm->aWorker, p, n);
  lsmShmBarrier(pDb);
  memcpy(pShm->aClient, p, n);







|
<







 







>
>



>
>
>
>
>
>
>
>
>
>
>
>
>
>

|


<
>

<
<
<
<
>
>
>
>
>

<
<
<
<
<
<
>
>
>
>
|

<
<
>
>
>
>

<
>
>

>







 







<
|







 







<
<




>
>
>
>
>









|









|
<
<

|







 







|







 







|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>










>
|
>
>
>
>
>
>
>




>







 







>
>
|
|
|
|
|
|
|
|
>
|
|
|
|
|
>







 







|













|






|







32
33
34
35
36
37
38
39

40
41
42
43
44
45
46
..
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113

114
115




116
117
118
119
120
121






122
123
124
125
126
127


128
129
130
131
132

133
134
135
136
137
138
139
140
141
142
143
...
353
354
355
356
357
358
359

360
361
362
363
364
365
366
367
...
371
372
373
374
375
376
377


378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406


407
408
409
410
411
412
413
414
415
...
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
...
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
...
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
...
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
....
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
**     2. The checkpoint id LSW.
**     3. The number of integer values in the entire checkpoint, including 
**        the two checksum values.
**     4. The total number of blocks in the database.
**     5. The block size.
**     6. The number of levels.
**     7. The nominal database page size.
**     8. Flag indicating if there exists a FREELIST record in the database.

**
**   Log pointer:
**
**     4 integers (2 for a 64-bit offset and 2 for a 64-bit checksum). See 
**     ckptExportLog() and ckptImportLog().
**
**   Append points:
................................................................................
**     1. First page of array,
**     2. Last page of array,
**     3. Root page of array (or 0),
**     4. Size of array in pages,
*/

/*
** LARGE NUMBERS OF LEVEL RECORDS:
**
** A limit on the number of rhs segments that may be present in the database
** file. Defining this limit ensures that all level records fit within
** the 4096 byte limit for checkpoint blobs.
**
** The number of right-hand-side segments in a database is counted as 
** follows:
**
**   * For each level in the database not undergoing a merge, add 1.
**
**   * For each level in the database that is undergoing a merge, add 
**     the number of segments on the rhs of the level.
**
** A level record not undergoing a merge is 6 integers. A level record 
** with nRhs rhs segments and (nRhs+1) input segments (i.e. including the 
** separators from the next level) is (6*nRhs+12) integers. The maximum
** per right-hand-side level is therefore 12 integers. So the maximum
** size of all level records in a checkpoint is 12*40=480 integers.
*/
#define LSM_MAX_RHS_SEGMENTS 40

/*

** LARGE NUMBERS OF FREELIST ENTRIES:
**




** A limit on the number of free-list entries stored in a checkpoint. Since
** each free-list entry consists of 3 integers, the maximum free-list size
** is 3*100=300 integers. Combined with the limit on rhs segments defined
** above, this ensures that a checkpoint always fits within a 4096 byte
** meta page.
**






** If the database contains more than 100 free blocks, the "overflow" flag
** in the checkpoint header is set and the remainder are stored in the
** system FREELIST entry in the LSM (along with user data). The value
** accompanying the FREELIST key in the LSM is, like a checkpoint, an array
** of 32-bit big-endian integers. As follows:
**


**     For each entry:
**       a. Block number of free block.
**       b. MSW of associated checkpoint id.
**       c. LSW of associated checkpoint id.
**

** The number of entries is not required - it is implied by the size of the
** value blob containing the integer array.
*/
#define LSM_MAX_FREELIST_ENTRIES 100

/*
** The argument to this macro must be of type u32. On a little-endian
** architecture, it returns the u32 value that results from interpreting
** the 4 bytes as a big-endian value. On a big-endian architecture, it
** returns the value that would be produced by intepreting the 4 bytes
** of the input value as a little-endian integer.
................................................................................
    ckptSetValue(p, iOut++, aiAppend[i], pRc);
  }
  *piOut = iOut;
};

int lsmCheckpointExport( 
  lsm_db *pDb,                    /* Connection handle */

  int nOvfl,                      /* Number of free-list entries in LSM */
  int bLog,                       /* True to update log-offset fields */
  i64 iId,                        /* Checkpoint id */
  int bCksum,                     /* If true, include checksums */
  void **ppCkpt,                  /* OUT: Buffer containing checkpoint */
  int *pnCkpt                     /* OUT: Size of checkpoint in bytes */
){
  int rc = LSM_OK;                /* Return Code */
................................................................................
  int nHdrLevel = 0;              /* Number of levels in checkpoint */
  int iLevel;                     /* Used to count out nHdrLevel levels */
  int iOut = 0;                   /* Current offset in aCkpt[] */
  Level *pLevel;                  /* Level iterator */
  int i;                          /* Iterator used while serializing freelist */
  CkptBuffer ckpt;



  /* Initialize the output buffer */
  memset(&ckpt, 0, sizeof(CkptBuffer));
  ckpt.pEnv = pDb->pEnv;
  iOut = CKPT_HDR_SIZE;

  /* If nOvfl is negative, copy the value from the previous worker snaphsot. */
  if( nOvfl<0 ){
    nOvfl = (int)(pDb->pShmhdr->aWorker[CKPT_HDR_OVFL]);
  }

  /* Write the log offset into the checkpoint. */
  ckptExportLog(pDb, bLog, &ckpt, &iOut, &rc);

  /* Write the append-point list */
  ckptExportAppendlist(pDb, &ckpt, &iOut, &rc);

  /* Figure out how many levels will be written to the checkpoint. */
  for(pLevel=lsmDbSnapshotLevel(pSnap); pLevel; pLevel=pLevel->pNext) nAll++;
  nHdrLevel = nAll;
  assert( nHdrLevel>0 );

  /* Serialize nHdrLevel levels. */
  iLevel = 0;
  for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nHdrLevel; pLevel=pLevel->pNext){
    ckptExportLevel(pLevel, &ckpt, &iOut, &rc);
    iLevel++;
  }

  /* Write the freelist */


  if( rc==LSM_OK ){
    int nFree = (pSnap->freelist.nEntry - nOvfl);
    ckptSetValue(&ckpt, iOut++, nFree, &rc);
    for(i=0; i<nFree; i++){
      FreelistEntry *p = &pSnap->freelist.aEntry[i];
      ckptSetValue(&ckpt, iOut++, p->iBlk, &rc);
      ckptSetValue(&ckpt, iOut++, (p->iId >> 32) & 0xFFFFFFFF, &rc);
      ckptSetValue(&ckpt, iOut++, p->iId & 0xFFFFFFFF, &rc);
    }
................................................................................
  ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nHdrLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_OVFL, nOvfl, &rc);

  if( bCksum ){
    ckptAddChecksum(&ckpt, iOut, &rc);
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);
    ckptSetValue(&ckpt, iOut+1, 0, &rc);
  }
................................................................................
    *paVal = 0;
  }

  return rc;
}

/*
** The worker lock must be held to call this function.
**
** The function is used to determine if the FREELIST record is required
** to store the free-block list. If not, zero is returned. Otherwise,
** the value returned is the number of free-list elements that should be
** saved in the LSM structure.
*/
int lsmCheckpointOverflow(
  lsm_db *pDb,                    /* Database handle (must hold worker lock) */
  void **ppVal,                   /* OUT: lsmMalloc'd buffer */
  int *pnVal,                     /* OUT: Size of *ppVal in bytes */
  int *pnOvfl                     /* OUT: Number of freelist entries in buf */
){
  int rc = LSM_OK;
  int nRet;
  Snapshot *p = pDb->pWorker;

  assert( lsmShmAssertWorker(pDb) );
  assert( (pnVal==0)==(ppVal==0) );
  assert( pnOvfl );

  nRet = p->freelist.nEntry - LSM_MAX_FREELIST_ENTRIES;
  if( nRet<=0 ){
    nRet = 0;
    if( ppVal ){
      *pnVal = 0;
      *ppVal = 0;
    }
  }else if( ppVal ){
    int i;                        /* Iterator variable */
    int iOut = 0;                 /* Current size of blob in ckpt */
    CkptBuffer ckpt;              /* Used to build FREELIST blob */

    memset(&ckpt, 0, sizeof(CkptBuffer));
    ckpt.pEnv = pDb->pEnv;
    for(i=p->freelist.nEntry-nRet; rc==LSM_OK && i<p->freelist.nEntry; i++){
      FreelistEntry *pEntry = &p->freelist.aEntry[i];
      ckptSetValue(&ckpt, iOut++, pEntry->iBlk, &rc);
      ckptSetValue(&ckpt, iOut++, (pEntry->iId >> 32) & 0xFFFFFFFF, &rc);
      ckptSetValue(&ckpt, iOut++, pEntry->iId & 0xFFFFFFFF, &rc);
    }

    ckptChangeEndianness(ckpt.aCkpt, iOut);
    *ppVal = ckpt.aCkpt;
    *pnVal = iOut*sizeof(u32);
  }

  *pnOvfl = nRet;
  return rc;
}

/*
** Connection pDb must be the worker to call this function.
**
** Load the FREELIST record from the database. Decode it and append the
** results to list pFreelist.
*/
int lsmCheckpointLoadOverflow(
  lsm_db *pDb,
  Freelist *pFreelist
){
  int rc;
  int nVal = 0;
  void *pVal = 0;
  assert( lsmShmAssertWorker(pDb) );
  
  rc = lsmSortedLoadFreelist(pDb, &pVal, &nVal);
  if( pVal ){
    u32 *aFree = (u32 *)pVal;
    int nFree = nVal / sizeof(int);

    ckptChangeEndianness(aFree, nFree);
    if( (nFree % 3) ){
      rc = LSM_CORRUPT_BKPT;
    }else{
      int i;
      int nLoad = nFree/3;

      for(i=0; rc==LSM_OK && i<nFree; i+=3){
        int iBlk = aFree[i];
        i64 iId = ((i64)(aFree[i+1])<<32) + (i64)aFree[i+2];
        rc = lsmFreelistAppend(pDb->pEnv, pFreelist, iBlk, iId);
      }
    }

    lsmFree(pDb->pEnv, pVal);
  }

  return rc;
}

/*
** Read the checkpoint id from meta-page pPg.
*/
static i64 ckptLoadId(MetaPage *pPg){
  i64 ret = 0;
................................................................................
      }
      return rc;
    }
  }
}

int lsmCheckpointLoadWorker(lsm_db *pDb){
  int rc;
  ShmHeader *pShm = pDb->pShmhdr;

  /* Must be holding the WORKER lock to do this */
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) );

  if( ckptChecksumOk(pShm->aWorker)==0 ){
    int nInt = (int)pShm->aClient[CKPT_HDR_NCKPT];
    memcpy(pShm->aWorker, pShm->aClient, nInt*sizeof(u32));
    if( ckptChecksumOk(pShm->aWorker)==0 ) return LSM_CORRUPT_BKPT;
  }

  rc = lsmCheckpointDeserialize(pDb, 1, pShm->aWorker, &pDb->pWorker);
  if( rc==LSM_OK && pDb->pWorker->nFreelistOvfl ){
    rc = lsmCheckpointLoadOverflow(pDb, &pDb->pWorker->freelist);
    pDb->pWorker->nFreelistOvfl = 0;
  }

  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
  return rc;
}

int lsmCheckpointDeserialize(
  lsm_db *pDb, 
  int bInclFreelist,              /* If true, deserialize free-list */
  u32 *aCkpt, 
  Snapshot **ppSnap
){
  int rc = LSM_OK;
  Snapshot *pNew;

  pNew = (Snapshot *)lsmMallocZeroRc(pDb->pEnv, sizeof(Snapshot), &rc);
................................................................................
    rc = ckptLoadLevels(pDb, aCkpt, &iIn, nLevel, &pNew->pLevel);

    /* Make a copy of the append-list */
    nCopy = sizeof(u32) * LSM_APPLIST_SZ;
    memcpy(pNew->aiAppend, &aCkpt[CKPT_HDR_SIZE+CKPT_LOGPTR_SIZE], nCopy);

    /* Copy the free-list */
    if( bInclFreelist ){
      pNew->nFreelistOvfl = aCkpt[CKPT_HDR_OVFL];
      nFree = aCkpt[iIn++];
      if( nFree ){
        pNew->freelist.aEntry = (FreelistEntry *)lsmMallocZeroRc(
            pDb->pEnv, sizeof(FreelistEntry)*nFree, &rc
        );
        if( rc==LSM_OK ){
          int i;
          for(i=0; i<nFree; i++){
            FreelistEntry *p = &pNew->freelist.aEntry[i];
            p->iBlk = aCkpt[iIn++];
            p->iId = ((i64)(aCkpt[iIn])<<32) + aCkpt[iIn+1];
            iIn += 2;
          }
          pNew->freelist.nEntry = pNew->freelist.nAlloc = nFree;
        }
      }
    }
  }

  if( rc!=LSM_OK ){
    lsmFreeSnapshot(pDb->pEnv, pNew);
    pNew = 0;
................................................................................
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) );
  assert( pDb->pWorker );

  for(p=pDb->pWorker->pLevel; p; p=p->pNext){
    nRhs += (p->nRight ? p->nRight : 1);
  }

  return (nRhs >= LSM_MAX_RHS_SEGMENTS);
}

/*
** The connection passed as the only argument is currently the worker
** connection. Some work has been performed on the database by the connection,
** but no new snapshot has been written into shared memory.
**
** This function updates the shared-memory worker and client snapshots with
** the new snapshot produced by the work performed by pDb.
**
** If successful, LSM_OK is returned. Otherwise, if an error occurs, an LSM
** error code is returned.
*/
int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush, int nOvfl){
  Snapshot *pSnap = pDb->pWorker;
  ShmHeader *pShm = pDb->pShmhdr;
  void *p = 0;
  int n = 0;
  int rc;

  rc = lsmCheckpointExport(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n);
  if( rc!=LSM_OK ) return rc;
  assert( ckptChecksumOk((u32 *)p) );

  assert( n<=LSM_META_PAGE_SIZE );
  memcpy(pShm->aWorker, p, n);
  lsmShmBarrier(pDb);
  memcpy(pShm->aClient, p, n);

Changes to src/lsm_file.c.

1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
....
1421
1422
1423
1424
1425
1426
1427


1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
....
1445
1446
1447
1448
1449
1450
1451








1452



1453
1454
1455
1456

1457
1458
1459
1460
1461

1462
1463
    lsmStringAppendf(&str, " %d", pArray->iLast);

    *pzOut = str.z;
  }

  if( bUnlock ){
    int rcwork = LSM_BUSY;
    lsmFinishWork(pDb, 0, &rcwork);
  }
  return rc;
}

/*
** Helper function for lsmFsIntegrityCheck()
*/
................................................................................
** out-of-range block numbers.
**
** If no errors are found, non-zero is returned. If an error is found, an
** assert() fails.
*/
int lsmFsIntegrityCheck(lsm_db *pDb){
  int i;


  FileSystem *pFS = pDb->pFS;
  u8 *aUsed;
  Level *pLevel;
  Snapshot *pWorker = pDb->pWorker;
  int nBlock = pWorker->nBlock;


  aUsed = lsmMallocZero(pDb->pEnv, nBlock);
  if( aUsed==0 ){
    /* Malloc has failed. Since this function is only called within debug
    ** builds, this probably means the user is running an OOM injection test.
    ** Regardless, it will not be possible to run the integrity-check at this
    ** time, so assume the database is Ok and return non-zero. */
................................................................................
    int i;
    checkBlocks(pFS, &pLevel->lhs, (pLevel->nRight!=0), nBlock, aUsed);
    for(i=0; i<pLevel->nRight; i++){
      checkBlocks(pFS, &pLevel->aRhs[i], 0, nBlock, aUsed);
    }
  }









  for(i=0; i<pWorker->freelist.nEntry; i++){



    u32 iBlk = pWorker->freelist.aEntry[i].iBlk;
    assert( iBlk<=nBlock );
    assert( aUsed[iBlk-1]==0 );
    aUsed[iBlk-1] = 1;

  }

  for(i=0; i<nBlock; i++) assert( aUsed[i]==1 );

  lsmFree(pDb->pEnv, aUsed);

  return 1;
}







|







 







>
>





<







 







>
>
>
>
>
>
>
>
|
>
>
>
|
|
|
|
>





>


1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
....
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434

1435
1436
1437
1438
1439
1440
1441
....
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
    lsmStringAppendf(&str, " %d", pArray->iLast);

    *pzOut = str.z;
  }

  if( bUnlock ){
    int rcwork = LSM_BUSY;
    lsmFinishWork(pDb, 0, 0, &rcwork);
  }
  return rc;
}

/*
** Helper function for lsmFsIntegrityCheck()
*/
................................................................................
** out-of-range block numbers.
**
** If no errors are found, non-zero is returned. If an error is found, an
** assert() fails.
*/
int lsmFsIntegrityCheck(lsm_db *pDb){
  int i;
  int j;
  Freelist freelist = {0, 0, 0};
  FileSystem *pFS = pDb->pFS;
  u8 *aUsed;
  Level *pLevel;
  Snapshot *pWorker = pDb->pWorker;
  int nBlock = pWorker->nBlock;


  aUsed = lsmMallocZero(pDb->pEnv, nBlock);
  if( aUsed==0 ){
    /* Malloc has failed. Since this function is only called within debug
    ** builds, this probably means the user is running an OOM injection test.
    ** Regardless, it will not be possible to run the integrity-check at this
    ** time, so assume the database is Ok and return non-zero. */
................................................................................
    int i;
    checkBlocks(pFS, &pLevel->lhs, (pLevel->nRight!=0), nBlock, aUsed);
    for(i=0; i<pLevel->nRight; i++){
      checkBlocks(pFS, &pLevel->aRhs[i], 0, nBlock, aUsed);
    }
  }

  if( pWorker->nFreelistOvfl ){
    int rc = lsmCheckpointLoadOverflow(pDb, &freelist);
    assert( rc==LSM_OK || rc==LSM_NOMEM );
    if( rc!=LSM_OK ) return 1;
  }

  for(j=0; j<2; j++){
    Freelist *pFreelist;
    if( j==0 ) pFreelist = &pWorker->freelist;
    if( j==1 ) pFreelist = &freelist;

    for(i=0; i<pFreelist->nEntry; i++){
      u32 iBlk = pFreelist->aEntry[i].iBlk;
      assert( iBlk<=nBlock );
      assert( aUsed[iBlk-1]==0 );
      aUsed[iBlk-1] = 1;
    }
  }

  for(i=0; i<nBlock; i++) assert( aUsed[i]==1 );

  lsmFree(pDb->pEnv, aUsed);
  lsmFree(pDb->pEnv, freelist.aEntry);
  return 1;
}

Changes to src/lsm_main.c.

123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
...
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
...
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
...
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
#if 0
  rc = lsmCheckpointWrite(pDb);
#endif

  rc = lsmBeginWork(pDb);
  if( rc==LSM_OK ) rc = lsmSortedAutoWork(pDb, nUnit);
  if( pDb->pWorker && pDb->pWorker->pLevel ){
    lsmFinishWork(pDb, 0, &rc);
  }else{
    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, &rcdummy);
  }
  return rc;
}

/*
** If required, run the recovery procedure to initialize the database.
** Return LSM_OK if successful or an error code otherwise.
................................................................................

/*
** This function flushes the contents of the in-memory tree to disk. It
** returns LSM_OK if successful, or an error code otherwise.
*/
int lsmFlushToDisk(lsm_db *pDb){
  int rc = LSM_OK;                /* Return code */
  int nLsmLevel;
  int bOvfl;

  /* Must not hold the worker snapshot when this is called. */
  assert( pDb->pWorker==0 );
  rc = lsmBeginWork(pDb);

  /* Save the position of each open cursor belonging to pDb. */
  if( rc==LSM_OK ){
................................................................................
    rc = lsmSortedAutoWork(pDb, LSM_AUTOWORK_QUANT);
  }

  /* Write the contents of the in-memory tree into the database file and 
  ** update the worker snapshot accordingly. Then flush the contents of 
  ** the db file to disk too. No calls to fsync() are made here - just 
  ** write().  */
  if( rc==LSM_OK ) bOvfl = lsmCheckpointOverflow(pDb, &nLsmLevel);
  if( rc==LSM_OK ) rc = lsmSortedFlushTree(pDb, nLsmLevel, bOvfl);
  if( rc==LSM_OK ) lsmTreeClear(pDb);

  lsmFinishWork(pDb, 1, &rc);

  /* Restore the position of any open cursors */
  if( rc==LSM_OK && pDb->pCsr ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
    pDb->pClient = 0;
    rc = lsmCheckpointLoad(pDb);
    if( rc==LSM_OK ){
      rc = lsmCheckpointDeserialize(pDb, pDb->aSnapshot, &pDb->pClient);
    }
    if( rc==LSM_OK ){
      rc = lsmRestoreCursors(pDb);
    }
  }

#if 0
................................................................................
    }
    lsmStringAppend(&s, "}", 1);
  }
  rc = s.n>=0 ? LSM_OK : LSM_NOMEM;

  /* Release the snapshot and return */
  if( bUnlock ){
    int rcwork = LSM_BUSY;
    lsmFinishWork(pDb, 0, &rcwork);
  }
  *pzOut = s.z;
  return rc;
}

int lsm_info(lsm_db *pDb, int eParam, ...){
  int rc = LSM_OK;







|


|







 







|
<







 







<
|


|







|







 







|
|







123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
...
265
266
267
268
269
270
271
272

273
274
275
276
277
278
279
...
287
288
289
290
291
292
293

294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
...
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
#if 0
  rc = lsmCheckpointWrite(pDb);
#endif

  rc = lsmBeginWork(pDb);
  if( rc==LSM_OK ) rc = lsmSortedAutoWork(pDb, nUnit);
  if( pDb->pWorker && pDb->pWorker->pLevel ){
    lsmFinishWork(pDb, 0, -1, &rc);
  }else{
    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, 0, &rcdummy);
  }
  return rc;
}

/*
** If required, run the recovery procedure to initialize the database.
** Return LSM_OK if successful or an error code otherwise.
................................................................................

/*
** This function flushes the contents of the in-memory tree to disk. It
** returns LSM_OK if successful, or an error code otherwise.
*/
int lsmFlushToDisk(lsm_db *pDb){
  int rc = LSM_OK;                /* Return code */
  int nOvfl = 0;                  /* Number of free-list entries in LSM */


  /* Must not hold the worker snapshot when this is called. */
  assert( pDb->pWorker==0 );
  rc = lsmBeginWork(pDb);

  /* Save the position of each open cursor belonging to pDb. */
  if( rc==LSM_OK ){
................................................................................
    rc = lsmSortedAutoWork(pDb, LSM_AUTOWORK_QUANT);
  }

  /* Write the contents of the in-memory tree into the database file and 
  ** update the worker snapshot accordingly. Then flush the contents of 
  ** the db file to disk too. No calls to fsync() are made here - just 
  ** write().  */

  if( rc==LSM_OK ) rc = lsmSortedFlushTree(pDb, &nOvfl);
  if( rc==LSM_OK ) lsmTreeClear(pDb);

  lsmFinishWork(pDb, 1, nOvfl, &rc);

  /* Restore the position of any open cursors */
  if( rc==LSM_OK && pDb->pCsr ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
    pDb->pClient = 0;
    rc = lsmCheckpointLoad(pDb);
    if( rc==LSM_OK ){
      rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
    }
    if( rc==LSM_OK ){
      rc = lsmRestoreCursors(pDb);
    }
  }

#if 0
................................................................................
    }
    lsmStringAppend(&s, "}", 1);
  }
  rc = s.n>=0 ? LSM_OK : LSM_NOMEM;

  /* Release the snapshot and return */
  if( bUnlock ){
    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, 0, &rcdummy);
  }
  *pzOut = s.z;
  return rc;
}

int lsm_info(lsm_db *pDb, int eParam, ...){
  int rc = LSM_OK;

Changes to src/lsm_shared.c.

85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
...
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
...
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
...
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
...
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
...
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
...
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
....
1179
1180
1181
1182
1183
1184
1185




1186
1187
1188
1189
1190
1191
1192
1193
1194
# define assertNotInFreelist(x,y)
# define assertMustbeWorker(x)
#endif

/*
** Append an entry to the free-list.
*/
static int flAppendEntry(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId){

  /* Assert that this is not an attempt to insert a duplicate block number */
  assertNotInFreelist(p, iBlk);

  /* Extend the space allocated for the freelist, if required */
  assert( p->nAlloc>=p->nEntry );
  if( p->nAlloc==p->nEntry ){
................................................................................

  return LSM_OK;
}

static int flInsertEntry(lsm_env *pEnv, Freelist *p, int iBlk){
  int rc;

  rc = flAppendEntry(pEnv, p, iBlk, 1);
  if( rc==LSM_OK ){
    memmove(&p->aEntry[1], &p->aEntry[0], sizeof(FreelistEntry)*(p->nEntry-1));
    p->aEntry[0].iBlk = iBlk;
    p->aEntry[0].iId = 1;
  }
  return rc;
}
................................................................................
    */
    lsmMutexEnter(pEnv, p->pClientMutex);
    snapshotDecrRefcnt(pEnv, pSnap);
    lsmMutexLeave(pEnv, p->pClientMutex);
  }
}

/*
** Create a new client snapshot based on the current contents of the worker 
** snapshot. The connection must be the worker to call this function.
*/
int lsmDbUpdateClient(lsm_db *pDb, int nLsmLevel, int bOvfl){
  assert( 0 );
  return 0;
#if 0
  Database *p = pDb->pDatabase;   /* Database handle */
  Snapshot *pOld;                 /* Old client snapshot object */
  Snapshot *pNew;                 /* New client snapshot object */
  int nByte;                      /* Memory required for new client snapshot */
  int rc = LSM_OK;                /* Memory required for new client snapshot */
  int nLevel = 0;                 /* Number of levels in worker snapshot */
  int nRight = 0;                 /* Total number of rhs in worker */
  int nKeySpace = 0;              /* Total size of split keys */
  Level *pLevel;                  /* Used to iterate through worker levels */
  Level **ppLink;                 /* Used to link levels together */
  u8 *pAvail;                     /* Used to divide up allocation */

  /* Must be the worker to call this. */
  assertMustbeWorker(pDb);

  /* Allocate space for the client snapshot and all levels. */
  for(pLevel=p->worker.pLevel; pLevel; pLevel=pLevel->pNext){
    nLevel++;
    nRight += pLevel->nRight;
  }
  nByte = sizeof(Snapshot) 
        + nLevel * sizeof(Level)
        + nRight * sizeof(Segment)
        + nKeySpace;
  pNew = (Snapshot *)lsmMallocZero(pDb->pEnv, nByte);
  if( !pNew ) return LSM_NOMEM_BKPT;
  pNew->pDatabase = p;
  pNew->iId = p->worker.iId;

  /* Copy the linked-list of Level structures */
  pAvail = (u8 *)&pNew[1];
  ppLink = &pNew->pLevel;
  for(pLevel=p->worker.pLevel; pLevel && rc==LSM_OK; pLevel=pLevel->pNext){
    Level *pNew;

    pNew = (Level *)pAvail;
    memcpy(pNew, pLevel, sizeof(Level));
    pAvail += sizeof(Level);

    if( pNew->nRight ){
      pNew->aRhs = (Segment *)pAvail;
      memcpy(pNew->aRhs, pLevel->aRhs, sizeof(Segment) * pNew->nRight);
      pAvail += (sizeof(Segment) * pNew->nRight);
      lsmSortedSplitkey(pDb, pNew, &rc);
    }

    /* This needs to come after any call to lsmSortedSplitkey(). Splitkey()
    ** uses data within the Merge object to set pNew->pSplitKey and co.  */
    pNew->pMerge = 0;

    *ppLink = pNew;
    ppLink = &pNew->pNext;
  }

  /* Create the serialized version of the new client snapshot. */
  if( p->bDirty && rc==LSM_OK ){
    assert( nLevel>nLsmLevel || p->worker.pLevel==0 );
    rc = lsmCheckpointExport(
        pDb, nLsmLevel, bOvfl, pNew->iId, 1, &pNew->pExport, &pNew->nExport
    );
  }

  if( rc==LSM_OK ){
    /* Initialize the new snapshot ref-count to 1 */
    pNew->nRef = 1;

    lsmDbSnapshotRelease(pDb->pEnv, pDb->pClient);

    /* Install the new client snapshot and release the old. */
    lsmMutexEnter(pDb->pEnv, p->pClientMutex);
    pOld = p->pClient;
    pNew->pSnapshotNext = pOld;
    p->pClient = pNew;
    if( pDb->pClient ){
      pDb->pClient = pNew;
      pNew->nRef++;
    }
    lsmMutexLeave(pDb->pEnv, p->pClientMutex);

    lsmDbSnapshotRelease(pDb->pEnv, pOld);
    p->bDirty = 0;

    /* Upgrade the user connection to the new client snapshot */

  }else{
    /* An error has occurred. Delete the allocated object. */
    freeClientSnapshot(pDb->pEnv, pNew);
  }

  return rc;
#endif
}

/*
** Allocate a new database file block to write data to, either by extending
** the database file or by recycling a free-list entry. The worker snapshot 
** must be held in order to call this function.
**
** If successful, *piBlk is set to the block number allocated and LSM_OK is
** returned. Otherwise, *piBlk is zeroed and an lsm error code returned.
................................................................................
** LSM_NOMEM).
*/
int lsmBlockFree(lsm_db *pDb, int iBlk){
  Snapshot *p = pDb->pWorker;

  assertMustbeWorker(pDb);
  assert( p->bRecordDelta==0 );
  return flAppendEntry(pDb->pEnv, &p->freelist, iBlk, p->iId);
}

/*
** Refree a database block. The worker snapshot must be held in order to call 
** this function.
**
** Refreeing is required when a block is allocated using lsmBlockAllocate()
................................................................................
  Freelist *pFree;                /* Database free-list */

  assert( (nElem%3)==0 );

  pFree = &p->freelist;
  for(i=0; i<nElem; i+=3){
    i64 iId = ((i64)(aElem[i+1]) << 32) + aElem[i+2];
    rc = flAppendEntry(pEnv, pFree, aElem[i], iId);
  }

  return rc;
#endif
  return LSM_OK;
}

................................................................................

/*
** Argument bFlush is true if the contents of the in-memory tree has just
** been flushed to disk. The significance of this is that once the snapshot
** created to hold the updated state of the database is synced to disk, log
** file space can be recycled.
*/
void lsmFinishWork(lsm_db *pDb, int bFlush, int *pRc){
  /* If no error has occurred, serialize the worker snapshot and write
  ** it to shared memory.  */
  if( *pRc==LSM_OK ){
    *pRc = lsmCheckpointSaveWorker(pDb, bFlush);
  }

  if( pDb->pWorker ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
  }

................................................................................
        if( (i64)pShm->hdr1.iTreeId==iTree 
         && lsmCheckpointId(pShm->aClient, 0)==iSnap
        ){
          /* Read lock has been successfully obtained. Deserialize the 
          ** checkpoint just loaded. TODO: This will be removed after 
          ** lsm_sorted.c is changed to work directly from the serialized
          ** version of the snapshot.  */
          rc = lsmCheckpointDeserialize(pDb, pDb->aSnapshot, &pDb->pClient);
          assert( (rc==LSM_OK)==(pDb->pClient!=0) );
        }else{
          rc = lsmReleaseReadlock(pDb);
        }
      }
      if( rc==LSM_BUSY ) rc = LSM_OK;
    }
................................................................................
    default:
      assert( !"bad eOp value passed to lsmShmAssertLock()" );
      break;
  }

  return ret;
}




#endif

void lsmShmBarrier(lsm_db *db){
  /* TODO */
}











|







 







|







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







|







 







|







 







|



|







 







|







 







>
>
>
>









85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
...
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
...
339
340
341
342
343
344
345





































































































346
347
348
349
350
351
352
...
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
...
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
...
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
...
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
....
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
# define assertNotInFreelist(x,y)
# define assertMustbeWorker(x)
#endif

/*
** Append an entry to the free-list.
*/
int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId){

  /* Assert that this is not an attempt to insert a duplicate block number */
  assertNotInFreelist(p, iBlk);

  /* Extend the space allocated for the freelist, if required */
  assert( p->nAlloc>=p->nEntry );
  if( p->nAlloc==p->nEntry ){
................................................................................

  return LSM_OK;
}

static int flInsertEntry(lsm_env *pEnv, Freelist *p, int iBlk){
  int rc;

  rc = lsmFreelistAppend(pEnv, p, iBlk, 1);
  if( rc==LSM_OK ){
    memmove(&p->aEntry[1], &p->aEntry[0], sizeof(FreelistEntry)*(p->nEntry-1));
    p->aEntry[0].iBlk = iBlk;
    p->aEntry[0].iId = 1;
  }
  return rc;
}
................................................................................
    */
    lsmMutexEnter(pEnv, p->pClientMutex);
    snapshotDecrRefcnt(pEnv, pSnap);
    lsmMutexLeave(pEnv, p->pClientMutex);
  }
}






































































































/*
** Allocate a new database file block to write data to, either by extending
** the database file or by recycling a free-list entry. The worker snapshot 
** must be held in order to call this function.
**
** If successful, *piBlk is set to the block number allocated and LSM_OK is
** returned. Otherwise, *piBlk is zeroed and an lsm error code returned.
................................................................................
** LSM_NOMEM).
*/
int lsmBlockFree(lsm_db *pDb, int iBlk){
  Snapshot *p = pDb->pWorker;

  assertMustbeWorker(pDb);
  assert( p->bRecordDelta==0 );
  return lsmFreelistAppend(pDb->pEnv, &p->freelist, iBlk, p->iId);
}

/*
** Refree a database block. The worker snapshot must be held in order to call 
** this function.
**
** Refreeing is required when a block is allocated using lsmBlockAllocate()
................................................................................
  Freelist *pFree;                /* Database free-list */

  assert( (nElem%3)==0 );

  pFree = &p->freelist;
  for(i=0; i<nElem; i+=3){
    i64 iId = ((i64)(aElem[i+1]) << 32) + aElem[i+2];
    rc = lsmFreelistAppend(pEnv, pFree, aElem[i], iId);
  }

  return rc;
#endif
  return LSM_OK;
}

................................................................................

/*
** Argument bFlush is true if the contents of the in-memory tree has just
** been flushed to disk. The significance of this is that once the snapshot
** created to hold the updated state of the database is synced to disk, log
** file space can be recycled.
*/
void lsmFinishWork(lsm_db *pDb, int bFlush, int nOvfl, int *pRc){
  /* If no error has occurred, serialize the worker snapshot and write
  ** it to shared memory.  */
  if( *pRc==LSM_OK ){
    *pRc = lsmCheckpointSaveWorker(pDb, bFlush, nOvfl);
  }

  if( pDb->pWorker ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
  }

................................................................................
        if( (i64)pShm->hdr1.iTreeId==iTree 
         && lsmCheckpointId(pShm->aClient, 0)==iSnap
        ){
          /* Read lock has been successfully obtained. Deserialize the 
          ** checkpoint just loaded. TODO: This will be removed after 
          ** lsm_sorted.c is changed to work directly from the serialized
          ** version of the snapshot.  */
          rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
          assert( (rc==LSM_OK)==(pDb->pClient!=0) );
        }else{
          rc = lsmReleaseReadlock(pDb);
        }
      }
      if( rc==LSM_BUSY ) rc = LSM_OK;
    }
................................................................................
    default:
      assert( !"bad eOp value passed to lsmShmAssertLock()" );
      break;
  }

  return ret;
}

int lsmShmAssertWorker(lsm_db *db){
  return lsmShmAssertLock(db, LSM_LOCK_WORKER, LSM_LOCK_EXCL) && db->pWorker;
}
#endif

void lsmShmBarrier(lsm_db *db){
  /* TODO */
}




Changes to src/lsm_sorted.c.

260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
...
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
....
2047
2048
2049
2050
2051
2052
2053
2054
2055

2056
2057
2058
2059
2060
2061
2062
....
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
....
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
....
2213
2214
2215
2216
2217
2218
2219
2220




2221
2222
2223
2224


2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248

2249
2250
2251
2252




2253
2254
2255
2256
2257

2258
2259
2260
2261
2262
2263
2264
....
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
....
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
....
3482
3483
3484
3485
3486
3487
3488
3489
3490
3491
3492
3493
3494
3495
3496
3497
3498
3499


3500
3501
3502
3503
3504
3505
3506
3507
3508
3509


3510
3511
3512
3513
3514
3515
3516
3517
3518
3519
3520
3521
....
3528
3529
3530
3531
3532
3533
3534
3535
3536
3537
3538
3539
3540
3541
3542
3543
3544
3545
3546
3547
....
3601
3602
3603
3604
3605
3606
3607
3608
3609
3610
3611
3612
3613
3614
3615
3616





3617


3618
3619
3620
3621
3622
3623
3624
3625
....
4069
4070
4071
4072
4073
4074
4075

4076
4077
4078
4079
4080
4081
4082
4083
4084
4085
4086
4087
4088
4089
4090
4091
4092



4093
4094
4095
4096
4097
4098
4099
  int nTree;
  int *aTree;
  BtreeCursor *pBtCsr;

  Snapshot *pSnap;

  /* Used by cursors flushing the in-memory tree only */
  int nLsmLevel;                  /* Number of levels to store in LSM */
  void *pSystemVal;               /* Pointer to buffer to free */
};

#define CURSOR_DATA_TREE      0
#define CURSOR_DATA_SYSTEM    1
#define CURSOR_DATA_SEGMENT   2

................................................................................
**   flushing the in-memory tree to disk - the new free-list and levels record
**   are flushed along with it.
**
** CURSOR_AT_FREELIST
**   This flag is set when sub-cursor CURSOR_DATA_SYSTEM is actually
**   pointing at a free list.
**
** CURSOR_AT_LEVELS
**   This flag is set when sub-cursor CURSOR_DATA_SYSTEM is actually
**   pointing at a free list.
**
** CURSOR_IGNORE_SYSTEM
**   If set, this cursor ignores system keys.
**
** CURSOR_NEXT_OK
**   Set if it is Ok to call lsm_csr_next().
**
** CURSOR_PREV_OK
**   Set if it is Ok to call lsm_csr_prev().
*/
#define CURSOR_IGNORE_DELETE    0x00000001
#define CURSOR_NEW_SYSTEM       0x00000002
#define CURSOR_AT_FREELIST      0x00000004
#define CURSOR_AT_LEVELS        0x00000008
#define CURSOR_IGNORE_SYSTEM    0x00000010
#define CURSOR_NEXT_OK          0x00000020
#define CURSOR_PREV_OK          0x00000040

typedef struct MergeWorker MergeWorker;
typedef struct Hierarchy Hierarchy;

................................................................................
}

/*
** If the free-block list is not empty, then have this cursor visit a key
** with (a) the system bit set, and (b) the key "F" and (c) a value blob
** containing the entire serialized free-block list.
*/
static void multiCursorVisitFreelist(MultiCursor *pCsr){
  assert( pCsr );

  pCsr->flags |= CURSOR_NEW_SYSTEM;
}

/*
** Allocate a new cursor to read the database (the in-memory tree and all
** levels). If successful, set *ppCsr to point to the new cursor object
** and return SQLITE4_OK. Otherwise, set *ppCsr to NULL and return an
................................................................................

    case CURSOR_DATA_SYSTEM:
      if( pCsr->flags & CURSOR_AT_FREELIST ){
        pKey = (void *)"FREELIST";
        nKey = 8;
        eType = SORTED_SYSTEM_WRITE;
      }
      else if( pCsr->flags & CURSOR_AT_LEVELS ){
        pKey = (void *)"LEVELS";
        nKey = 6;
        eType = SORTED_SYSTEM_WRITE;
      }
      break;

    default: {
      int iSeg = iKey - CURSOR_DATA_SEGMENT;
      if( iSeg==pCsr->nSegCsr && pCsr->pBtCsr ){
        pKey = pCsr->pBtCsr->pKey;
        nKey = pCsr->pBtCsr->nKey;
................................................................................
    if( lsmTreeCursorValid(pCsr->pTreeCsr) ){
      lsmTreeCursorValue(pCsr->pTreeCsr, ppVal, pnVal);
    }else{
      *ppVal = 0;
      *pnVal = 0;
    }
  }else if( iVal==CURSOR_DATA_SYSTEM ){
    if( 0 && (pCsr->flags & CURSOR_AT_FREELIST) ){
      u32 *aVal;
      int nVal;

      assert( pCsr->pSystemVal==0 );
      rc = lsmGetFreelist(pCsr->pDb, &aVal, &nVal);
      pCsr->pSystemVal = (void *)aVal;
      if( nVal<(3*LSM_CKPT_MIN_NONLSM) ){
        *pnVal = 0;
        *ppVal = 0;
      }else{
        *ppVal = (void *)&aVal[3*LSM_CKPT_MIN_NONLSM];
        *pnVal = sizeof(u32) * (nVal - 3*LSM_CKPT_MIN_NONLSM);
      }
      lsmFreelistDeltaBegin(pCsr->pDb);
    }else if( (pCsr->flags & CURSOR_AT_LEVELS) && pCsr->nLsmLevel>0 ){
      lsmFree(pCsr->pDb->pEnv, pCsr->pSystemVal);
      lsmCheckpointLevels(pCsr->pDb, pCsr->nLsmLevel, ppVal, pnVal);
      pCsr->pSystemVal = *ppVal;
    }else{
      *ppVal = 0;
      *pnVal = 0;
    }
  }else if( iVal-CURSOR_DATA_SEGMENT<pCsr->nSegCsr 
         && segmentCursorValid(&pCsr->aSegCsr[iVal-CURSOR_DATA_SEGMENT]) 
  ){
................................................................................
    *ppVal = 0;
    *pnVal = 0;
  }
  assert( rc==LSM_OK || (*ppVal==0 && *pnVal==0) );
  return rc;
}

int lsmSortedLoadSystem(lsm_db *pDb){




  MultiCursor *pCsr = 0;          /* Cursor used to retreive free-list */
  int rc;                         /* Return Code */

  assert( pDb->pWorker );


  rc = multiCursorAllocate(pDb, 1, &pCsr);
  if( rc==LSM_OK ){
    void *pVal; int nVal;         /* Value read from database */

    rc = lsmMCursorLast(pCsr);
    if( rc==LSM_OK 
     && pCsr->eType==SORTED_SYSTEM_WRITE 
     && pCsr->key.nData==6 
     && 0==memcmp(pCsr->key.pData, "LEVELS", 6)
    ){
      rc = lsmMCursorValue(pCsr, &pVal, &nVal);
      if( rc==LSM_OK ){
        rc = lsmCheckpointLoadLevels(pDb, pVal, nVal);
      }
      if( rc==LSM_OK ){
        rc = lsmMCursorPrev(pCsr);
      }
    }

    if( rc==LSM_OK 
     && pCsr->eType==SORTED_SYSTEM_WRITE 
     && pCsr->key.nData==8 
     && 0==memcmp(pCsr->key.pData, "FREELIST", 8)
    ){

      rc = lsmMCursorValue(pCsr, &pVal, &nVal);
      if( rc==LSM_OK ){
        int n32 = nVal / sizeof(u32);
        rc = lsmSetFreelist(pDb, (u32 *)pVal, n32);




      }
    }

    lsmMCursorClose(pCsr);
  }

  return rc;
}

static void multiCursorDoCompare(MultiCursor *pCsr, int iOut, int bReverse){
  int i1;
  int i2;
  int iRes;
................................................................................
  int iPtr = 0; 

  if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE;
  assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE );

  assert( (pCsr->flags & CURSOR_NEW_SYSTEM)==0 );
  assert( (pCsr->flags & CURSOR_AT_FREELIST)==0 );
  assert( (pCsr->flags & CURSOR_AT_LEVELS)==0 );

  pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK);
  lsmTreeCursorSeek(pCsr->pTreeCsr, pKey, nKey, &res);
  switch( eESeek ){
    case LSM_SEEK_EQ:
      if( res!=0 ){
        lsmTreeCursorReset(pCsr->pTreeCsr);
................................................................................
      if( iKey==CURSOR_DATA_TREE ){
        if( bReverse ){
          rc = lsmTreeCursorPrev(pCsr->pTreeCsr);
        }else{
          rc = lsmTreeCursorNext(pCsr->pTreeCsr);
        }
      }else if( iKey==CURSOR_DATA_SYSTEM ){
        assert( pCsr->flags & (CURSOR_AT_FREELIST | CURSOR_AT_LEVELS) );
        assert( pCsr->flags & CURSOR_NEW_SYSTEM );
        assert( bReverse==0 );

        if( pCsr->flags & CURSOR_AT_FREELIST ){
          pCsr->flags &= ~CURSOR_AT_FREELIST;
          pCsr->flags |= CURSOR_AT_LEVELS;
        }else{
          pCsr->flags &= ~CURSOR_AT_LEVELS;
        }
      }else if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nSegCsr) ){
        assert( bReverse==0 && pCsr->pBtCsr );
        rc = btreeCursorNext(pCsr->pBtCsr);
      }else{
        LevelCursor *pLevel = &pCsr->aSegCsr[iKey-CURSOR_DATA_SEGMENT];
        rc = segmentCursorAdvance(pLevel, bReverse);
      }
................................................................................

static void sortedInvokeWorkHook(lsm_db *pDb){
  if( pDb->xWork ){
    pDb->xWork(pDb, pDb->pWorkCtx);
  }
}

int lsmSortedNewToplevel(
  lsm_db *pDb,                    /* Connection handle */
  int nLevel,                     /* Number of levels store in LSM (often 0) */
  int bFreelist                   /* True to store the freelist in the LSM */
){
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  int iLeftPtr = 0;



  /* Allocate the new level structure to write to. */
  pNext = lsmDbSnapshotLevel(pDb->pWorker);
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);

  /* Create a cursor to gather the data required by the new segment. The new
  ** segment contains everything in the tree and pointers to the next segment
  ** in the database (if any).  */
  if( rc==LSM_OK ){



    pNew->pNext = pNext;
    lsmDbSnapshotSetLevel(pDb->pWorker, pNew);

    /* TODO: Fix the third parameter to the following */
    rc = multiCursorNew(pDb, pDb->pWorker, 1, 0, &pCsr);
    if( rc==LSM_OK ){
      if( pNext ){
        assert( pNext->pMerge==0 || pNext->nRight>0 );
        if( pNext->pMerge==0 ){
          if( pNext->lhs.iRoot ){
            rc = multiCursorAddLevel(pCsr, pNext, MULTICURSOR_ADDLEVEL_LHS_SEP);
            if( rc==LSM_OK ){
................................................................................
        /* The new level will be the only level in the LSM. There is no reason
         ** to write out delete keys in this case.  */
        multiCursorIgnoreDelete(pCsr);
      }
    }

    if( rc==LSM_OK ){
      assert( bFreelist || nLevel==0 );
      if( bFreelist ){
        multiCursorVisitFreelist(pCsr);
      }
      multiCursorReadSeparators(pCsr);
      pCsr->nLsmLevel = nLevel;
    }
  }

  if( rc!=LSM_OK ){
    lsmMCursorClose(pCsr);
  }else{
    Merge merge;                  /* Merge object used to create new level */
................................................................................
**
** In both cases, the connection hold a worker snapshot reference. In
** the first, the connection also holds the in-memory tree write-version.
** In the second, no in-memory tree version reference is held at all.
*/
int lsmSortedFlushTree(
  lsm_db *pDb,                    /* Connection handle */
  int nLevel,
  int bFreelist
){
  int rc;

  assert( pDb->pWorker );

  /* If there is nothing to do, return early. */
  if( lsmTreeSize(pDb)==0 && bFreelist==0 ) return LSM_OK;








  rc = lsmSortedNewToplevel(pDb, nLevel, bFreelist);
  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );

#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "tree flush");
#endif
  return rc;
}
................................................................................
      lsmFinishReadTrans(pDb);
    }
  }

  if( rc==LSM_OK && nPage>0 ){
    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
    int nWrite = 0;


    assert( pDb->pWorker==0 );
    rc = lsmBeginWork(pDb);
    if( rc==LSM_OK ){
      rc = sortedWork(pDb, nPage, bOptimize, &nWrite);
    }

    if( rc==LSM_OK && nWrite && (flags & LSM_WORK_CHECKPOINT) ){
      int bOvfl;
      int nLsm;

      bOvfl = lsmCheckpointOverflow(pDb, &nLsm);
      rc = lsmSortedFlushDb(pDb);
      if( rc==LSM_OK && bOvfl ) rc = lsmSortedNewToplevel(pDb, nLsm, bOvfl);
    }

    lsmFinishWork(pDb, 0, &rc);



    assert( pDb->pWorker==0 );
    if( pnWrite ) *pnWrite = nWrite;
  }else if( pnWrite ){
    *pnWrite = 0;
  }

  /* If the LSM_WORK_CHECKPOINT flag is specified and one is available,







|







 







<
<
<
<












<







 







|

>







 







<
<
<
<
<







 







|
|



|
|
<
|
<
<
<
<
<

<
<
<
<







 







|
>
>
>
>




>
>


<
<

<
<
<
<
<
<
<
<
<
<
<
<
<
<





>


<
|
>
>
>
>





>







 







<







 







|


<
<
|
<
<
<
<







 







|

|
|







>
>









<
>
>
|
|
|
<
<







 







<
<
|
<

<







 







|
<






|
>
>
>
>
>
|
>
>
|







 







>







|
|
|
|
|
|
|
|
|
|
>
>
>







260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
...
284
285
286
287
288
289
290




291
292
293
294
295
296
297
298
299
300
301
302

303
304
305
306
307
308
309
....
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
....
2133
2134
2135
2136
2137
2138
2139





2140
2141
2142
2143
2144
2145
2146
....
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182

2183





2184




2185
2186
2187
2188
2189
2190
2191
....
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213


2214














2215
2216
2217
2218
2219
2220
2221
2222

2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
....
2429
2430
2431
2432
2433
2434
2435

2436
2437
2438
2439
2440
2441
2442
....
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566


2567




2568
2569
2570
2571
2572
2573
2574
....
3451
3452
3453
3454
3455
3456
3457
3458
3459
3460
3461
3462
3463
3464
3465
3466
3467
3468
3469
3470
3471
3472
3473
3474
3475
3476
3477
3478
3479

3480
3481
3482
3483
3484


3485
3486
3487
3488
3489
3490
3491
....
3498
3499
3500
3501
3502
3503
3504


3505

3506

3507
3508
3509
3510
3511
3512
3513
....
3567
3568
3569
3570
3571
3572
3573
3574

3575
3576
3577
3578
3579
3580
3581
3582
3583
3584
3585
3586
3587
3588
3589
3590
3591
3592
3593
3594
3595
3596
3597
....
4041
4042
4043
4044
4045
4046
4047
4048
4049
4050
4051
4052
4053
4054
4055
4056
4057
4058
4059
4060
4061
4062
4063
4064
4065
4066
4067
4068
4069
4070
4071
4072
4073
4074
4075
  int nTree;
  int *aTree;
  BtreeCursor *pBtCsr;

  Snapshot *pSnap;

  /* Used by cursors flushing the in-memory tree only */
  int *pnOvfl;                    /* Number of free-list entries to store */
  void *pSystemVal;               /* Pointer to buffer to free */
};

#define CURSOR_DATA_TREE      0
#define CURSOR_DATA_SYSTEM    1
#define CURSOR_DATA_SEGMENT   2

................................................................................
**   flushing the in-memory tree to disk - the new free-list and levels record
**   are flushed along with it.
**
** CURSOR_AT_FREELIST
**   This flag is set when sub-cursor CURSOR_DATA_SYSTEM is actually
**   pointing at a free list.
**




** CURSOR_IGNORE_SYSTEM
**   If set, this cursor ignores system keys.
**
** CURSOR_NEXT_OK
**   Set if it is Ok to call lsm_csr_next().
**
** CURSOR_PREV_OK
**   Set if it is Ok to call lsm_csr_prev().
*/
#define CURSOR_IGNORE_DELETE    0x00000001
#define CURSOR_NEW_SYSTEM       0x00000002
#define CURSOR_AT_FREELIST      0x00000004

#define CURSOR_IGNORE_SYSTEM    0x00000010
#define CURSOR_NEXT_OK          0x00000020
#define CURSOR_PREV_OK          0x00000040

typedef struct MergeWorker MergeWorker;
typedef struct Hierarchy Hierarchy;

................................................................................
}

/*
** If the free-block list is not empty, then have this cursor visit a key
** with (a) the system bit set, and (b) the key "F" and (c) a value blob
** containing the entire serialized free-block list.
*/
static void multiCursorVisitFreelist(MultiCursor *pCsr, int *pnOvfl){
  assert( pCsr );
  pCsr->pnOvfl = pnOvfl;
  pCsr->flags |= CURSOR_NEW_SYSTEM;
}

/*
** Allocate a new cursor to read the database (the in-memory tree and all
** levels). If successful, set *ppCsr to point to the new cursor object
** and return SQLITE4_OK. Otherwise, set *ppCsr to NULL and return an
................................................................................

    case CURSOR_DATA_SYSTEM:
      if( pCsr->flags & CURSOR_AT_FREELIST ){
        pKey = (void *)"FREELIST";
        nKey = 8;
        eType = SORTED_SYSTEM_WRITE;
      }





      break;

    default: {
      int iSeg = iKey - CURSOR_DATA_SEGMENT;
      if( iSeg==pCsr->nSegCsr && pCsr->pBtCsr ){
        pKey = pCsr->pBtCsr->pKey;
        nKey = pCsr->pBtCsr->nKey;
................................................................................
    if( lsmTreeCursorValid(pCsr->pTreeCsr) ){
      lsmTreeCursorValue(pCsr->pTreeCsr, ppVal, pnVal);
    }else{
      *ppVal = 0;
      *pnVal = 0;
    }
  }else if( iVal==CURSOR_DATA_SYSTEM ){
    if( pCsr->flags & CURSOR_AT_FREELIST ){
      void *aVal;
      int nVal;

      assert( pCsr->pSystemVal==0 );
      rc = lsmCheckpointOverflow(pCsr->pDb, &aVal, &nVal, pCsr->pnOvfl);
      *ppVal = pCsr->pSystemVal = aVal;

      *pnVal = nVal;





      lsmFreelistDeltaBegin(pCsr->pDb);




    }else{
      *ppVal = 0;
      *pnVal = 0;
    }
  }else if( iVal-CURSOR_DATA_SEGMENT<pCsr->nSegCsr 
         && segmentCursorValid(&pCsr->aSegCsr[iVal-CURSOR_DATA_SEGMENT]) 
  ){
................................................................................
    *ppVal = 0;
    *pnVal = 0;
  }
  assert( rc==LSM_OK || (*ppVal==0 && *pnVal==0) );
  return rc;
}

int lsmSortedLoadFreelist(
  lsm_db *pDb,                    /* Database handle (must be worker) */
  void **ppVal,                   /* OUT: Blob containing LSM free-list */
  int *pnVal                      /* OUT: Size of *ppVal blob in bytes */
){
  MultiCursor *pCsr = 0;          /* Cursor used to retreive free-list */
  int rc;                         /* Return Code */

  assert( pDb->pWorker );
  assert( *ppVal==0 && *pnVal==0 );

  rc = multiCursorAllocate(pDb, 1, &pCsr);
  if( rc==LSM_OK ){


    rc = lsmMCursorLast(pCsr);














    if( rc==LSM_OK 
     && pCsr->eType==SORTED_SYSTEM_WRITE 
     && pCsr->key.nData==8 
     && 0==memcmp(pCsr->key.pData, "FREELIST", 8)
    ){
      void *pVal; int nVal;         /* Value read from database */
      rc = lsmMCursorValue(pCsr, &pVal, &nVal);
      if( rc==LSM_OK ){

        *ppVal = lsmMallocRc(pDb->pEnv, nVal, &rc);
        if( *ppVal ){
          memcpy(*ppVal, pVal, nVal);
          *pnVal = nVal;
        }
      }
    }

    lsmMCursorClose(pCsr);
  }

  return rc;
}

static void multiCursorDoCompare(MultiCursor *pCsr, int iOut, int bReverse){
  int i1;
  int i2;
  int iRes;
................................................................................
  int iPtr = 0; 

  if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE;
  assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE );

  assert( (pCsr->flags & CURSOR_NEW_SYSTEM)==0 );
  assert( (pCsr->flags & CURSOR_AT_FREELIST)==0 );


  pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK);
  lsmTreeCursorSeek(pCsr->pTreeCsr, pKey, nKey, &res);
  switch( eESeek ){
    case LSM_SEEK_EQ:
      if( res!=0 ){
        lsmTreeCursorReset(pCsr->pTreeCsr);
................................................................................
      if( iKey==CURSOR_DATA_TREE ){
        if( bReverse ){
          rc = lsmTreeCursorPrev(pCsr->pTreeCsr);
        }else{
          rc = lsmTreeCursorNext(pCsr->pTreeCsr);
        }
      }else if( iKey==CURSOR_DATA_SYSTEM ){
        assert( pCsr->flags & CURSOR_AT_FREELIST );
        assert( pCsr->flags & CURSOR_NEW_SYSTEM );
        assert( bReverse==0 );


        pCsr->flags &= ~CURSOR_AT_FREELIST;




      }else if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nSegCsr) ){
        assert( bReverse==0 && pCsr->pBtCsr );
        rc = btreeCursorNext(pCsr->pBtCsr);
      }else{
        LevelCursor *pLevel = &pCsr->aSegCsr[iKey-CURSOR_DATA_SEGMENT];
        rc = segmentCursorAdvance(pLevel, bReverse);
      }
................................................................................

static void sortedInvokeWorkHook(lsm_db *pDb){
  if( pDb->xWork ){
    pDb->xWork(pDb, pDb->pWorkCtx);
  }
}

static int sortedNewToplevel(
  lsm_db *pDb,                    /* Connection handle */
  int bTree,                      /* True to store contents of in-memory tree */
  int *pnOvfl                     /* OUT: Number of free-list entries stored */
){
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  int iLeftPtr = 0;

  assert( pnOvfl );

  /* Allocate the new level structure to write to. */
  pNext = lsmDbSnapshotLevel(pDb->pWorker);
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);

  /* Create a cursor to gather the data required by the new segment. The new
  ** segment contains everything in the tree and pointers to the next segment
  ** in the database (if any).  */
  if( rc==LSM_OK ){

    rc = multiCursorNew(pDb, pDb->pWorker, bTree, 0, &pCsr);
    if( rc==LSM_OK ){
      pNew->pNext = pNext;
      lsmDbSnapshotSetLevel(pDb->pWorker, pNew);
    }


    if( rc==LSM_OK ){
      if( pNext ){
        assert( pNext->pMerge==0 || pNext->nRight>0 );
        if( pNext->pMerge==0 ){
          if( pNext->lhs.iRoot ){
            rc = multiCursorAddLevel(pCsr, pNext, MULTICURSOR_ADDLEVEL_LHS_SEP);
            if( rc==LSM_OK ){
................................................................................
        /* The new level will be the only level in the LSM. There is no reason
         ** to write out delete keys in this case.  */
        multiCursorIgnoreDelete(pCsr);
      }
    }

    if( rc==LSM_OK ){


      multiCursorVisitFreelist(pCsr, pnOvfl);

      multiCursorReadSeparators(pCsr);

    }
  }

  if( rc!=LSM_OK ){
    lsmMCursorClose(pCsr);
  }else{
    Merge merge;                  /* Merge object used to create new level */
................................................................................
**
** In both cases, the connection hold a worker snapshot reference. In
** the first, the connection also holds the in-memory tree write-version.
** In the second, no in-memory tree version reference is held at all.
*/
int lsmSortedFlushTree(
  lsm_db *pDb,                    /* Connection handle */
  int *pnOvfl                     /* OUT: Number of free-list entries written */

){
  int rc;

  assert( pDb->pWorker );

  /* If there is nothing to do, return early. */
  if( lsmTreeSize(pDb)==0 ){
    int nOvfl = 0;
    lsmCheckpointOverflow(pDb, 0, 0, &nOvfl);
    if( nOvfl==0 ){
      *pnOvfl = 0;
      return LSM_OK;
    }
  }

  rc = sortedNewToplevel(pDb, 1, pnOvfl);
  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );

#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "tree flush");
#endif
  return rc;
}
................................................................................
      lsmFinishReadTrans(pDb);
    }
  }

  if( rc==LSM_OK && nPage>0 ){
    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
    int nWrite = 0;
    int nOvfl = -1;

    assert( pDb->pWorker==0 );
    rc = lsmBeginWork(pDb);
    if( rc==LSM_OK ){
      rc = sortedWork(pDb, nPage, bOptimize, &nWrite);
    }

    if( rc==LSM_OK && nWrite ){
      lsmCheckpointOverflow(pDb, 0, 0, &nOvfl);
      rc = lsmSortedFlushDb(pDb);
      if( rc==LSM_OK && nOvfl ) rc = sortedNewToplevel(pDb, 0, &nOvfl);
    }

    if( nWrite ){
      lsmFinishWork(pDb, 0, nOvfl, &rc);
    }else{
      int rcdummy = LSM_BUSY;
      lsmFinishWork(pDb, 0, 0, &rcdummy);
    }

    assert( pDb->pWorker==0 );
    if( pnWrite ) *pnWrite = nWrite;
  }else if( pnWrite ){
    *pnWrite = 0;
  }

  /* If the LSM_WORK_CHECKPOINT flag is specified and one is available,

Changes to test/ckpt1.test.

38
39
40
41
42
43
44




45







46



































47

  db eval { 
    SELECT count(*) FROM t1;
    PRAGMA integrity_check;
  }
} [list $nLevel ok]


















































finish_test








>
>
>
>
|
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

>
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
  db eval { 
    SELECT count(*) FROM t1;
    PRAGMA integrity_check;
  }
} [list $nLevel ok]


#-------------------------------------------------------------------------
# The point of this test is to add a large number of blocks to the 
# free-block list and check that this doesn't seem to cause any
# obvious problems.
#
do_test 3.0 {
  db close
  forcedelete test.db
  sqlite4 db file:test.db?lsm_block_size=65536
  execsql { 
    CREATE TABLE t1(a PRIMARY KEY, b);
    CREATE INDEX i1 ON t1(b);
  }
} {}
do_execsql_test 3.1 {
  INSERT INTO t1 VALUES(randstr(100,100), randstr(100,100));
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --   2
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --   4
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --   8
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --  16
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --  32
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --  64
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   -- 128
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   -- 256
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   -- 512
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --  1K
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --  2K
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --  4K
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --  8K
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   -- 16K
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   -- 32K
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   -- 64K
}
do_test 3.2 {
  sqlite4_lsm_work db main -optimize 1000000
  execsql { SELECT count(*) FROM t1 }
} {65536}
do_test 3.3 {
  db close
  sqlite4 db test.db
  execsql { SELECT count(*) FROM t1 }
} {65536}
do_test 3.4 {
  execsql { INSERT INTO t1 VALUES(randstr(100,100), randstr(100,100)) }
  sqlite4_lsm_work db main -optimize 1000000
  execsql { SELECT count(*) FROM t1 }
} {65537}

finish_test