SQLite4
Check-in [d6c6889249]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Remove dead code. Run "lomem" tests with max-freelist set to 4.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | multi-process
Files: files | file ages | folders
SHA1: d6c6889249fa74dc4516c189b3a2876a3c8045d3
User & Date: dan 2012-08-30 20:01:02
Context
2012-08-31
18:43
Add DMS "lock". check-in: 5f4708d2e9 user: dan tags: multi-process
2012-08-30
20:01
Remove dead code. Run "lomem" tests with max-freelist set to 4. check-in: d6c6889249 user: dan tags: multi-process
18:01
Fix memory leaks. Add the LSM_CONFIG_MAX_FREELIST parameter to make testing free-list overflow easier. check-in: 3e1ecb95c9 user: dan tags: multi-process
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest_main.c.

459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
  return (nFail!=0);
}

static lsm_db *configure_lsm_db(TestDb *pDb){
  lsm_db *pLsm;
  pLsm = tdb_lsm(pDb);
  if( pLsm ){
    tdb_lsm_config_str(pDb, "mmap=1 autowork=1 nmerge=4 worker_nmerge=4");
  }
  return pLsm;
}


static void do_speed_write_hook2(
  void *pCtx,







|







459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
  return (nFail!=0);
}

static lsm_db *configure_lsm_db(TestDb *pDb){
  lsm_db *pLsm;
  pLsm = tdb_lsm(pDb);
  if( pLsm ){
    tdb_lsm_config_str(pDb, "mmap=0 autowork=1 nmerge=4 worker_nmerge=4");
  }
  return pLsm;
}


static void do_speed_write_hook2(
  void *pCtx,

Changes to src/lsmInt.h.

453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471

472


473
474
475
476
477
478
479
...
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
  i64 iId;                        /* Snapshot id */

  /* Used by worker snapshots only */
  int nBlock;                     /* Number of blocks in database file */
  u32 aiAppend[LSM_APPLIST_SZ];   /* Append point list */
  Freelist freelist;              /* Free block list */
  int nFreelistOvfl;              /* Number of extra free-list entries in LSM */

  int nFreelistDelta;
  int bRecordDelta;
};
#define LSM_INITIAL_SNAPSHOT_ID 11

/*
** Functions from file "lsm_ckpt.c".
*/
int lsmCheckpointWrite(lsm_db *);
int lsmCheckpointLevels(lsm_db *, int, void **, int *);
int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal);

int lsmCheckpointOverflow(lsm_db *pDb, void **, int *, int *);



int lsmCheckpointRecover(lsm_db *);
int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **);

int lsmCheckpointLoad(lsm_db *pDb);
int lsmCheckpointLoadWorker(lsm_db *pDb);
int lsmCheckpointStore(lsm_db *pDb, int);
................................................................................
void lsmLogCheckpoint(lsm_db *, lsm_i64);
int lsmLogStructure(lsm_db *pDb, char **pzVal);


/**************************************************************************
** Functions from file "lsm_shared.c".
*/
int lsmGetFreelist(lsm_db *pDb, u32 **paFree, int *pnFree);

int lsmDbDatabaseFind(lsm_db*, const char *);
void lsmDbDatabaseRelease(lsm_db *);

int lsmBeginRecovery(lsm_db *);
int lsmBeginReadTrans(lsm_db *);
int lsmBeginWriteTrans(lsm_db *);
int lsmBeginFlush(lsm_db *);

int lsmBeginWork(lsm_db *);
void lsmFinishWork(lsm_db *, int, int, int *);

int lsmFinishRecovery(lsm_db *);
void lsmFinishReadTrans(lsm_db *);
int lsmFinishWriteTrans(lsm_db *, int);
int lsmFinishFlush(lsm_db *, int);

int lsmSnapshotFreelist(lsm_db *, int **, int *);
int lsmSnapshotSetFreelist(lsm_db *, int *, int);

Snapshot *lsmDbSnapshotClient(lsm_db *);
Snapshot *lsmDbSnapshotWorker(lsm_db *);
void lsmDbSnapshotRelease(lsm_env *pEnv, Snapshot *);

void lsmSnapshotSetNBlock(Snapshot *, int);
void lsmSnapshotSetCkptid(Snapshot *, i64);

Level *lsmDbSnapshotLevel(Snapshot *);
void lsmDbSnapshotSetLevel(Snapshot *, Level *);

void lsmDbRecoveryComplete(lsm_db *, int);








<
<
<









>

>
>







 







<




<












<




<

<







453
454
455
456
457
458
459



460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
...
714
715
716
717
718
719
720

721
722
723
724

725
726
727
728
729
730
731
732
733
734
735
736

737
738
739
740

741

742
743
744
745
746
747
748
  i64 iId;                        /* Snapshot id */

  /* Used by worker snapshots only */
  int nBlock;                     /* Number of blocks in database file */
  u32 aiAppend[LSM_APPLIST_SZ];   /* Append point list */
  Freelist freelist;              /* Free block list */
  int nFreelistOvfl;              /* Number of extra free-list entries in LSM */



};
#define LSM_INITIAL_SNAPSHOT_ID 11

/*
** Functions from file "lsm_ckpt.c".
*/
int lsmCheckpointWrite(lsm_db *);
int lsmCheckpointLevels(lsm_db *, int, void **, int *);
int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal);

int lsmCheckpointOverflow(lsm_db *pDb, void **, int *, int *);
int lsmCheckpointOverflowRequired(lsm_db *pDb);
int lsmCheckpointOverflowLoad(lsm_db *pDb, Freelist *);

int lsmCheckpointRecover(lsm_db *);
int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **);

int lsmCheckpointLoad(lsm_db *pDb);
int lsmCheckpointLoadWorker(lsm_db *pDb);
int lsmCheckpointStore(lsm_db *pDb, int);
................................................................................
void lsmLogCheckpoint(lsm_db *, lsm_i64);
int lsmLogStructure(lsm_db *pDb, char **pzVal);


/**************************************************************************
** Functions from file "lsm_shared.c".
*/


int lsmDbDatabaseFind(lsm_db*, const char *);
void lsmDbDatabaseRelease(lsm_db *);


int lsmBeginReadTrans(lsm_db *);
int lsmBeginWriteTrans(lsm_db *);
int lsmBeginFlush(lsm_db *);

int lsmBeginWork(lsm_db *);
void lsmFinishWork(lsm_db *, int, int, int *);

int lsmFinishRecovery(lsm_db *);
void lsmFinishReadTrans(lsm_db *);
int lsmFinishWriteTrans(lsm_db *, int);
int lsmFinishFlush(lsm_db *, int);


int lsmSnapshotSetFreelist(lsm_db *, int *, int);

Snapshot *lsmDbSnapshotClient(lsm_db *);
Snapshot *lsmDbSnapshotWorker(lsm_db *);



void lsmSnapshotSetCkptid(Snapshot *, i64);

Level *lsmDbSnapshotLevel(Snapshot *);
void lsmDbSnapshotSetLevel(Snapshot *, Level *);

void lsmDbRecoveryComplete(lsm_db *, int);

Changes to src/lsm_ckpt.c.

648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672

673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690


691
692
693
694
695
696
697
698
699
700
701

702
703
704
705
706
707
708













709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
...
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743

  return rc;
}

/*
** The worker lock must be held to call this function.
**
** The function is used to determine if the FREELIST record is required
** to store the free-block list. If not, zero is returned. Otherwise,
** the value returned is the number of free-list elements that should be
** saved in the LSM structure.
*/
int lsmCheckpointOverflow(
  lsm_db *pDb,                    /* Database handle (must hold worker lock) */
  void **ppVal,                   /* OUT: lsmMalloc'd buffer */
  int *pnVal,                     /* OUT: Size of *ppVal in bytes */
  int *pnOvfl                     /* OUT: Number of freelist entries in buf */
){
  int rc = LSM_OK;
  int nRet;
  Snapshot *p = pDb->pWorker;

  assert( lsmShmAssertWorker(pDb) );
  assert( (pnVal==0)==(ppVal==0) );
  assert( pnOvfl );


  if( ppVal && p->nFreelistOvfl ){
    rc = lsmCheckpointLoadOverflow(pDb, &p->freelist);
    if( rc!=LSM_OK ) return rc;
    p->nFreelistOvfl = 0;
  }

  nRet = p->freelist.nEntry - pDb->nMaxFreelist;
  if( nRet<=0 ){
    nRet = 0;
    if( ppVal ){
      *pnVal = 0;
      *ppVal = 0;
    }
  }else if( ppVal ){
    int i;                        /* Iterator variable */
    int iOut = 0;                 /* Current size of blob in ckpt */
    CkptBuffer ckpt;              /* Used to build FREELIST blob */



    memset(&ckpt, 0, sizeof(CkptBuffer));
    ckpt.pEnv = pDb->pEnv;
    for(i=p->freelist.nEntry-nRet; rc==LSM_OK && i<p->freelist.nEntry; i++){
      FreelistEntry *pEntry = &p->freelist.aEntry[i];
      ckptSetValue(&ckpt, iOut++, pEntry->iBlk, &rc);
      ckptSetValue(&ckpt, iOut++, (pEntry->iId >> 32) & 0xFFFFFFFF, &rc);
      ckptSetValue(&ckpt, iOut++, pEntry->iId & 0xFFFFFFFF, &rc);
    }

    ckptChangeEndianness(ckpt.aCkpt, iOut);

    *ppVal = ckpt.aCkpt;
    *pnVal = iOut*sizeof(u32);
  }

  *pnOvfl = nRet;
  return rc;
}














/*
** Connection pDb must be the worker to call this function.
**
** Load the FREELIST record from the database. Decode it and append the
** results to list pFreelist.
*/
int lsmCheckpointLoadOverflow(
  lsm_db *pDb,
  Freelist *pFreelist
){
  int rc;
  int nVal = 0;
  void *pVal = 0;
  assert( lsmShmAssertWorker(pDb) );
................................................................................
    int nFree = nVal / sizeof(int);

    ckptChangeEndianness(aFree, nFree);
    if( (nFree % 3) ){
      rc = LSM_CORRUPT_BKPT;
    }else{
      int i;
      int nLoad = nFree/3;

      for(i=0; rc==LSM_OK && i<nFree; i+=3){
        int iBlk = aFree[i];
        i64 iId = ((i64)(aFree[i+1])<<32) + (i64)aFree[i+2];
        rc = lsmFreelistAppend(pDb->pEnv, pFreelist, iBlk, iId);
      }
    }








|
|
<
<












<
|
>

|
|




|
<

<
|
|
<
|



>
>









<

>







>
>
>
>
>
>
>
>
>
>
>
>
>







|







 







<
<







648
649
650
651
652
653
654
655
656


657
658
659
660
661
662
663
664
665
666
667
668

669
670
671
672
673
674
675
676
677
678

679

680
681

682
683
684
685
686
687
688
689
690
691
692
693
694
695
696

697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
...
738
739
740
741
742
743
744


745
746
747
748
749
750
751

  return rc;
}

/*
** The worker lock must be held to call this function.
**
** The function serializes and returns the data that should be stored as
** the FREELIST system record.


*/
int lsmCheckpointOverflow(
  lsm_db *pDb,                    /* Database handle (must hold worker lock) */
  void **ppVal,                   /* OUT: lsmMalloc'd buffer */
  int *pnVal,                     /* OUT: Size of *ppVal in bytes */
  int *pnOvfl                     /* OUT: Number of freelist entries in buf */
){
  int rc = LSM_OK;
  int nRet;
  Snapshot *p = pDb->pWorker;

  assert( lsmShmAssertWorker(pDb) );

  assert( pnOvfl && ppVal && pnVal );
  assert( pDb->nMaxFreelist>=2 && pDb->nMaxFreelist<=LSM_MAX_FREELIST_ENTRIES );

  if( p->nFreelistOvfl ){
    rc = lsmCheckpointOverflowLoad(pDb, &p->freelist);
    if( rc!=LSM_OK ) return rc;
    p->nFreelistOvfl = 0;
  }

  if( p->freelist.nEntry<=pDb->nMaxFreelist ){

    nRet = 0;

    *pnVal = 0;
    *ppVal = 0;

  }else{
    int i;                        /* Iterator variable */
    int iOut = 0;                 /* Current size of blob in ckpt */
    CkptBuffer ckpt;              /* Used to build FREELIST blob */

    nRet = (p->freelist.nEntry - pDb->nMaxFreelist);

    memset(&ckpt, 0, sizeof(CkptBuffer));
    ckpt.pEnv = pDb->pEnv;
    for(i=p->freelist.nEntry-nRet; rc==LSM_OK && i<p->freelist.nEntry; i++){
      FreelistEntry *pEntry = &p->freelist.aEntry[i];
      ckptSetValue(&ckpt, iOut++, pEntry->iBlk, &rc);
      ckptSetValue(&ckpt, iOut++, (pEntry->iId >> 32) & 0xFFFFFFFF, &rc);
      ckptSetValue(&ckpt, iOut++, pEntry->iId & 0xFFFFFFFF, &rc);
    }

    ckptChangeEndianness(ckpt.aCkpt, iOut);

    *ppVal = ckpt.aCkpt;
    *pnVal = iOut*sizeof(u32);
  }

  *pnOvfl = nRet;
  return rc;
}

/*
** The connection must be the worker in order to call this function.
**
** True is returned if there are currently too many free-list entries
** in-memory to store in a checkpoint. Before calling lsmCheckpointSaveWorker()
** to save the current worker snapshot, a new top-level LSM segment must
** be created so that some of them can be written to the LSM. 
*/
int lsmCheckpointOverflowRequired(lsm_db *pDb){
  assert( lsmShmAssertWorker(pDb) );
  return (pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist);
}

/*
** Connection pDb must be the worker to call this function.
**
** Load the FREELIST record from the database. Decode it and append the
** results to list pFreelist.
*/
int lsmCheckpointOverflowLoad(
  lsm_db *pDb,
  Freelist *pFreelist
){
  int rc;
  int nVal = 0;
  void *pVal = 0;
  assert( lsmShmAssertWorker(pDb) );
................................................................................
    int nFree = nVal / sizeof(int);

    ckptChangeEndianness(aFree, nFree);
    if( (nFree % 3) ){
      rc = LSM_CORRUPT_BKPT;
    }else{
      int i;


      for(i=0; rc==LSM_OK && i<nFree; i+=3){
        int iBlk = aFree[i];
        i64 iId = ((i64)(aFree[i+1])<<32) + (i64)aFree[i+2];
        rc = lsmFreelistAppend(pDb->pEnv, pFreelist, iBlk, iId);
      }
    }

Changes to src/lsm_file.c.

1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
....
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
  FileSystem *pFS, 
  Segment *pSeg,
  int bExtra,                     /* If true, count the "next" block if any */
  int nUsed,
  u8 *aUsed
){
  if( pSeg ){
    int i;

    if( pSeg && pSeg->nSize>0 ){
      const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);

      int iBlk;
      int iLastBlk;
      iBlk = fsPageToBlock(pFS, pSeg->iFirst);
      iLastBlk = fsPageToBlock(pFS, pSeg->iLast);
................................................................................
    checkBlocks(pFS, &pLevel->lhs, (pLevel->nRight!=0), nBlock, aUsed);
    for(i=0; i<pLevel->nRight; i++){
      checkBlocks(pFS, &pLevel->aRhs[i], 0, nBlock, aUsed);
    }
  }

  if( pWorker->nFreelistOvfl ){
    int rc = lsmCheckpointLoadOverflow(pDb, &freelist);
    assert( rc==LSM_OK || rc==LSM_NOMEM );
    if( rc!=LSM_OK ) return 1;
  }

  for(j=0; j<2; j++){
    Freelist *pFreelist;
    if( j==0 ) pFreelist = &pWorker->freelist;







<
<







 







|







1377
1378
1379
1380
1381
1382
1383


1384
1385
1386
1387
1388
1389
1390
....
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
  FileSystem *pFS, 
  Segment *pSeg,
  int bExtra,                     /* If true, count the "next" block if any */
  int nUsed,
  u8 *aUsed
){
  if( pSeg ){


    if( pSeg && pSeg->nSize>0 ){
      const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);

      int iBlk;
      int iLastBlk;
      iBlk = fsPageToBlock(pFS, pSeg->iFirst);
      iLastBlk = fsPageToBlock(pFS, pSeg->iLast);
................................................................................
    checkBlocks(pFS, &pLevel->lhs, (pLevel->nRight!=0), nBlock, aUsed);
    for(i=0; i<pLevel->nRight; i++){
      checkBlocks(pFS, &pLevel->aRhs[i], 0, nBlock, aUsed);
    }
  }

  if( pWorker->nFreelistOvfl ){
    int rc = lsmCheckpointOverflowLoad(pDb, &freelist);
    assert( rc==LSM_OK || rc==LSM_NOMEM );
    if( rc!=LSM_OK ) return 1;
  }

  for(j=0; j<2; j++){
    Freelist *pFreelist;
    if( j==0 ) pFreelist = &pWorker->freelist;

Changes to src/lsm_main.c.

88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
}

lsm_env *lsm_get_env(lsm_db *pDb){
  assert( pDb->pEnv );
  return pDb->pEnv;
}

/*
** Release snapshot handle *ppSnap. Then set *ppSnap to zero. This
** is useful for doing (say):
**
**   dbReleaseSnapshot(pDb->pEnv, &pDb->pWorker);
*/
static void dbReleaseSnapshot(lsm_env *pEnv, Snapshot **ppSnap){
  lsmDbSnapshotRelease(pEnv, *ppSnap);
  *ppSnap = 0;
}

/*
** If database handle pDb is currently holding a client snapshot, but does
** not have any open cursors or write transactions, release it.
*/
static void dbReleaseClientSnapshot(lsm_db *pDb){
  if( pDb->nTransOpen==0 && pDb->pCsr==0 ){
    lsmFinishReadTrans(pDb);







<
<
<
<
<
<
<
<
<
<
<







88
89
90
91
92
93
94











95
96
97
98
99
100
101
}

lsm_env *lsm_get_env(lsm_db *pDb){
  assert( pDb->pEnv );
  return pDb->pEnv;
}












/*
** If database handle pDb is currently holding a client snapshot, but does
** not have any open cursors or write transactions, release it.
*/
static void dbReleaseClientSnapshot(lsm_db *pDb){
  if( pDb->nTransOpen==0 && pDb->pCsr==0 ){
    lsmFinishReadTrans(pDb);

Changes to src/lsm_shared.c.

74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
...
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
...
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
...
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
...
397
398
399
400
401
402
403
404
405

406
407
408
409
410
411
412
...
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
...
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
}
static void assertNotInFreelist(Freelist *p, int iBlk){
  int i; 
  for(i=0; i<p->nEntry; i++){
    assert( p->aEntry[i].iBlk!=iBlk );
  }
}
static void assertMustbeWorker(lsm_db *pDb){
  assert( pDb->pWorker );
}
#else
# define assertNotInFreelist(x,y)
# define assertMustbeWorker(x)
#endif

/*
** Append an entry to the free-list.
*/
int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId){

................................................................................
  int nNew = p->nEntry - 1;
  assert( nNew>=0 );
  memmove(&p->aEntry[0], &p->aEntry[1], sizeof(FreelistEntry) * nNew);
  p->nEntry = nNew;
}

/*
** This function frees all resources held by the Database structure passed
** as the only argument.
*/
static void freeDatabase(lsm_env *pEnv, Database *p){
  assert( holdingGlobalMutex(pEnv) );
  if( p ){
    /* Free the mutexes */
    lsmMutexDel(pEnv, p->pClientMutex);
................................................................................
  return pSnapshot->pLevel;
}

void lsmDbSnapshotSetLevel(Snapshot *pSnap, Level *pLevel){
  pSnap->pLevel = pLevel;
}

/*
** Get/set methods for the snapshot block-count. These should only be
** used with worker snapshots.
*/
void lsmSnapshotSetNBlock(Snapshot *pSnap, int nNew){
}

static void snapshotDecrRefcnt(lsm_env *pEnv, Snapshot *pSnap){
#if 0
  Database *p = pSnap->pDatabase;

  pSnap->nRef--;
  assert( pSnap->nRef>=0 );
  if( pSnap->nRef==0 ){
    Snapshot *pIter = p->pClient;
    assert( pSnap!=pIter );
    while( pIter->pSnapshotNext!=pSnap ) pIter = pIter->pSnapshotNext;
    pIter->pSnapshotNext = pSnap->pSnapshotNext;
    freeClientSnapshot(pEnv, pSnap);
  }
#endif
}

/*
** Release a snapshot reference obtained by calling lsmDbSnapshotWorker()
** or lsmDbSnapshotClient().
*/
void lsmDbSnapshotRelease(lsm_env *pEnv, Snapshot *pSnap){
  if( pSnap ){
    Database *p = pSnap->pDatabase;

    /* If this call is to release a pointer to the worker snapshot, relinquish
    ** the worker mutex.  
    **
    ** If pSnap is a client snapshot, decrement the reference count. When the
    ** reference count reaches zero, free the snapshot object. The decrement
    ** and (nRef==0) test are protected by the database client mutex.
    */
    lsmMutexEnter(pEnv, p->pClientMutex);
    snapshotDecrRefcnt(pEnv, pSnap);
    lsmMutexLeave(pEnv, p->pClientMutex);
  }
}

/*
** Allocate a new database file block to write data to, either by extending
** the database file or by recycling a free-list entry. The worker snapshot 
** must be held in order to call this function.
**
** If successful, *piBlk is set to the block number allocated and LSM_OK is
................................................................................
    /* TODO: The "has been checkpointed" bit */

    rc = lsmLsmInUse(pDb, iFree, &bInUse);
    if( rc==LSM_OK && bInUse==0 ){
      iRet = pFree->aEntry[0].iBlk;
      flRemoveEntry0(pFree);
      assert( iRet!=0 );
      if( p->bRecordDelta ){
        p->nFreelistDelta++;
      }
    }
  }

  /* If no block was allocated from the free-list, allocate one at the
  ** end of the file. */
  if( rc==LSM_OK && iRet==0 ){
    iRet = ++pDb->pWorker->nBlock;
................................................................................
**
** If successful, LSM_OK is returned. Otherwise, an lsm error code (e.g. 
** LSM_NOMEM).
*/
int lsmBlockFree(lsm_db *pDb, int iBlk){
  Snapshot *p = pDb->pWorker;

  assertMustbeWorker(pDb);
  assert( p->bRecordDelta==0 );

  return lsmFreelistAppend(pDb->pEnv, &p->freelist, iBlk, p->iId);
}

/*
** Refree a database block. The worker snapshot must be held in order to call 
** this function.
**
................................................................................
  int rc = LSM_OK;                /* Return code */
  Snapshot *p = pDb->pWorker;

  if( iBlk==p->nBlock ){
    p->nBlock--;
  }else{
    rc = flInsertEntry(pDb->pEnv, &p->freelist, iBlk);
    if( p->bRecordDelta ){ p->nFreelistDelta--; }
  }

  return rc;
}

void lsmFreelistDeltaBegin(lsm_db *pDb){
  assertMustbeWorker(pDb);
  assert( pDb->pWorker->bRecordDelta==0 );
  pDb->pWorker->nFreelistDelta = 0;
  pDb->pWorker->bRecordDelta = 1;
}

void lsmFreelistDeltaEnd(lsm_db *pDb){
  assertMustbeWorker(pDb);
  pDb->pWorker->bRecordDelta = 0;
}

int lsmFreelistDelta(lsm_db *pDb){
  return pDb->pWorker->nFreelistDelta;
}

/*
** Return the current contents of the free-list as a list of integers.
*/
int lsmSnapshotFreelist(lsm_db *pDb, int **paFree, int *pnFree){
  int rc = LSM_OK;                /* Return Code */
#if 0
  int *aFree = 0;                 /* Integer array to return via *paFree */
  int nFree;                      /* Value to return via *pnFree */
  Freelist *p;                    /* Database free list object */

  assert( pDb->pWorker );
  p = &pDb->pDatabase->freelist;
  nFree = p->nEntry;
  if( nFree && paFree ){
    aFree = lsmMallocRc(pDb->pEnv, sizeof(int) * nFree, &rc);
    if( aFree ){
      int i;
      for(i=0; i<nFree; i++){
        aFree[i] = p->aEntry[i].iBlk;
      }
    }
  }

  *pnFree = nFree;
  if( paFree ) *paFree = aFree;
#endif
  return rc;
}

int lsmGetFreelist(
  lsm_db *pDb,                    /* Database handle (must be worker) */
  u32 **paFree,                   /* OUT: malloc'd array */
  int *pnFree                     /* OUT: Size of array at *paFree */
){
  int rc = LSM_OK;                /* Return Code */
#if 0
  u32 *aFree = 0;                 /* Integer array to return via *paFree */
  int nFree;                      /* Value to return via *pnFree */
  Freelist *p;                    /* Database free list object */

  assert( pDb->pWorker );
  p = &pDb->pDatabase->freelist;
  nFree = p->nEntry * 3;
  if( nFree && paFree ){
    aFree = lsmMallocRc(pDb->pEnv, sizeof(u32) * nFree, &rc);
    if( aFree ){
      int i;
      for(i=0; i<p->nEntry; i++){
        aFree[i*3] = p->aEntry[i].iBlk;
        aFree[i*3+1] = (u32)((p->aEntry[i].iId >> 32) & 0xFFFFFFFF);
        aFree[i*3+2] = (u32)(p->aEntry[i].iId & 0xFFFFFFFF);
      }
    }
  }

  *pnFree = nFree;
  if( paFree ) *paFree = aFree;
#endif
  return rc;
}

int lsmSetFreelist(lsm_db *pDb, u32 *aElem, int nElem){
#if 0
  Database *p = pDb->pDatabase;
  lsm_env *pEnv = pDb->pEnv;
  int rc = LSM_OK;                /* Return code */
  int i;                          /* Iterator variable */
  Freelist *pFree;                /* Database free-list */

  assert( (nElem%3)==0 );

  pFree = &p->freelist;
  for(i=0; i<nElem; i+=3){
    i64 iId = ((i64)(aElem[i+1]) << 32) + aElem[i+2];
    rc = lsmFreelistAppend(pEnv, pFree, aElem[i], iId);
  }

  return rc;
#endif
  return LSM_OK;
}

/*
** If required, copy a database checkpoint from shared memory into the
** database itself.
**
** The WORKER lock must not be held when this is called. This is because
** this function may indirectly call fsync(). And the WORKER lock should
** not be held that long (in case it is required by a client flushing an
................................................................................
    if( rc==LSM_OK ) rc = lsmTreeLoadHeader(pDb);
    if( rc==LSM_OK ) lsmLogCheckpoint(pDb, iLogoff);
    if( rc==LSM_OK ) lsmTreeEndTransaction(pDb, 1);
    if( rc==LSM_BUSY ) rc = LSM_OK;
  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK);
  return rc;
}

/*
** This function is called when a connection is about to run log file
** recovery (read the contents of the log file from disk and create a new
** in memory tree from it). This happens when the very first connection
** starts up and connects to the database.
**
** This sets the connections tree-version handle to one suitable to insert
** the read data into.
**
** Once recovery is complete (regardless of whether or not it is successful),
** lsmFinishRecovery() must be called to release resources locked by
** this function.
*/
int lsmBeginRecovery(lsm_db *pDb){
  int rc = LSM_OK;                /* Return code */
  Database *p = pDb->pDatabase;   /* Shared data handle */

  assert( 0 );

  assert( p );
  assert( pDb->pWorker );
  assert( pDb->pClient==0 );

#if 0
  if( rc==LSM_OK ){
    assert( pDb->pTV==0 );
    rc = lsmTreeWriteVersion(pDb->pEnv, p->pTree, &pDb->pTV);
  }
#endif
  return rc;
}

int lsmBeginWork(lsm_db *pDb){
  int rc;

  /* Attempt to take the WORKER lock */







<
<
<


<







 







|







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







<
<
<







 







|
|
>







 







<





<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







74
75
76
77
78
79
80



81
82

83
84
85
86
87
88
89
...
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
...
291
292
293
294
295
296
297











































298
299
300
301
302
303
304
...
324
325
326
327
328
329
330



331
332
333
334
335
336
337
...
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
...
371
372
373
374
375
376
377

378
379
380
381
382


































































































383
384
385
386
387
388
389
...
442
443
444
445
446
447
448
































449
450
451
452
453
454
455
}
static void assertNotInFreelist(Freelist *p, int iBlk){
  int i; 
  for(i=0; i<p->nEntry; i++){
    assert( p->aEntry[i].iBlk!=iBlk );
  }
}



#else
# define assertNotInFreelist(x,y)

#endif

/*
** Append an entry to the free-list.
*/
int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId){

................................................................................
  int nNew = p->nEntry - 1;
  assert( nNew>=0 );
  memmove(&p->aEntry[0], &p->aEntry[1], sizeof(FreelistEntry) * nNew);
  p->nEntry = nNew;
}

/*
** tHIS Function frees all resources held by the Database structure passed
** as the only argument.
*/
static void freeDatabase(lsm_env *pEnv, Database *p){
  assert( holdingGlobalMutex(pEnv) );
  if( p ){
    /* Free the mutexes */
    lsmMutexDel(pEnv, p->pClientMutex);
................................................................................
  return pSnapshot->pLevel;
}

void lsmDbSnapshotSetLevel(Snapshot *pSnap, Level *pLevel){
  pSnap->pLevel = pLevel;
}













































/*
** Allocate a new database file block to write data to, either by extending
** the database file or by recycling a free-list entry. The worker snapshot 
** must be held in order to call this function.
**
** If successful, *piBlk is set to the block number allocated and LSM_OK is
................................................................................
    /* TODO: The "has been checkpointed" bit */

    rc = lsmLsmInUse(pDb, iFree, &bInUse);
    if( rc==LSM_OK && bInUse==0 ){
      iRet = pFree->aEntry[0].iBlk;
      flRemoveEntry0(pFree);
      assert( iRet!=0 );



    }
  }

  /* If no block was allocated from the free-list, allocate one at the
  ** end of the file. */
  if( rc==LSM_OK && iRet==0 ){
    iRet = ++pDb->pWorker->nBlock;
................................................................................
**
** If successful, LSM_OK is returned. Otherwise, an lsm error code (e.g. 
** LSM_NOMEM).
*/
int lsmBlockFree(lsm_db *pDb, int iBlk){
  Snapshot *p = pDb->pWorker;

  assert( lsmShmAssertWorker(pDb) );
  /* TODO: Should assert() that lsmCheckpointOverflow() has not been called */

  return lsmFreelistAppend(pDb->pEnv, &p->freelist, iBlk, p->iId);
}

/*
** Refree a database block. The worker snapshot must be held in order to call 
** this function.
**
................................................................................
  int rc = LSM_OK;                /* Return code */
  Snapshot *p = pDb->pWorker;

  if( iBlk==p->nBlock ){
    p->nBlock--;
  }else{
    rc = flInsertEntry(pDb->pEnv, &p->freelist, iBlk);

  }

  return rc;
}



































































































/*
** If required, copy a database checkpoint from shared memory into the
** database itself.
**
** The WORKER lock must not be held when this is called. This is because
** this function may indirectly call fsync(). And the WORKER lock should
** not be held that long (in case it is required by a client flushing an
................................................................................
    if( rc==LSM_OK ) rc = lsmTreeLoadHeader(pDb);
    if( rc==LSM_OK ) lsmLogCheckpoint(pDb, iLogoff);
    if( rc==LSM_OK ) lsmTreeEndTransaction(pDb, 1);
    if( rc==LSM_BUSY ) rc = LSM_OK;
  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK);
































  return rc;
}

int lsmBeginWork(lsm_db *pDb){
  int rc;

  /* Attempt to take the WORKER lock */

Changes to src/lsm_sorted.c.

2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
....
3534
3535
3536
3537
3538
3539
3540
3541
3542
3543
3544
3545
3546
3547
3548
....
3576
3577
3578
3579
3580
3581
3582
3583
3584
3585
3586
3587
3588
3589
3590
3591
3592
3593
3594
3595
3596
....
4052
4053
4054
4055
4056
4057
4058
4059
4060
4061
4062
4063
4064
4065
4066
4067
4068
4069
      void *aVal;
      int nVal;

      assert( pCsr->pSystemVal==0 );
      rc = lsmCheckpointOverflow(pCsr->pDb, &aVal, &nVal, pCsr->pnOvfl);
      *ppVal = pCsr->pSystemVal = aVal;
      *pnVal = nVal;
      lsmFreelistDeltaBegin(pCsr->pDb);
    }else{
      *ppVal = 0;
      *pnVal = 0;
    }
  }else if( iVal-CURSOR_DATA_SEGMENT<pCsr->nSegCsr 
         && segmentCursorValid(&pCsr->aSegCsr[iVal-CURSOR_DATA_SEGMENT]) 
  ){
................................................................................
    while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){
      rc = mergeWorkerStep(&mergeworker);
    }

    mergeWorkerShutdown(&mergeworker, &rc);
    pNew->pMerge = 0;
  }
  lsmFreelistDeltaEnd(pDb);

  /* Link the new level into the top of the tree. */
  if( rc==LSM_OK ){
    if( pDel ){
      pDel->iRoot = 0;
    }
  }else{
................................................................................
  int *pnOvfl                     /* OUT: Number of free-list entries written */
){
  int rc;

  assert( pDb->pWorker );

  /* If there is nothing to do, return early. */
  if( lsmTreeSize(pDb)==0 ){
    int nOvfl = 0;
    lsmCheckpointOverflow(pDb, 0, 0, &nOvfl);
    if( nOvfl==0 ){
      *pnOvfl = 0;
      return LSM_OK;
    }
  }

  rc = sortedNewToplevel(pDb, 1, pnOvfl);
  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );

#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "tree flush");
................................................................................
    assert( pDb->pWorker==0 );
    rc = lsmBeginWork(pDb);
    if( rc==LSM_OK ){
      rc = sortedWork(pDb, nPage, bOptimize, &nWrite);
    }

    if( rc==LSM_OK && nWrite ){
      int nExpectOvfl = 0;
      lsmCheckpointOverflow(pDb, 0, 0, &nExpectOvfl);
      rc = lsmSortedFlushDb(pDb);
      if( rc==LSM_OK && nExpectOvfl ){
        rc = sortedNewToplevel(pDb, 0, &nOvfl);
      }
    }

    if( nWrite ){
      lsmFinishWork(pDb, 0, nOvfl, &rc);
    }else{







<







 







<







 







|
<
<
<
|
|
<







 







<
<

|







2179
2180
2181
2182
2183
2184
2185

2186
2187
2188
2189
2190
2191
2192
....
3533
3534
3535
3536
3537
3538
3539

3540
3541
3542
3543
3544
3545
3546
....
3574
3575
3576
3577
3578
3579
3580
3581



3582
3583

3584
3585
3586
3587
3588
3589
3590
....
4046
4047
4048
4049
4050
4051
4052


4053
4054
4055
4056
4057
4058
4059
4060
4061
      void *aVal;
      int nVal;

      assert( pCsr->pSystemVal==0 );
      rc = lsmCheckpointOverflow(pCsr->pDb, &aVal, &nVal, pCsr->pnOvfl);
      *ppVal = pCsr->pSystemVal = aVal;
      *pnVal = nVal;

    }else{
      *ppVal = 0;
      *pnVal = 0;
    }
  }else if( iVal-CURSOR_DATA_SEGMENT<pCsr->nSegCsr 
         && segmentCursorValid(&pCsr->aSegCsr[iVal-CURSOR_DATA_SEGMENT]) 
  ){
................................................................................
    while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){
      rc = mergeWorkerStep(&mergeworker);
    }

    mergeWorkerShutdown(&mergeworker, &rc);
    pNew->pMerge = 0;
  }


  /* Link the new level into the top of the tree. */
  if( rc==LSM_OK ){
    if( pDel ){
      pDel->iRoot = 0;
    }
  }else{
................................................................................
  int *pnOvfl                     /* OUT: Number of free-list entries written */
){
  int rc;

  assert( pDb->pWorker );

  /* If there is nothing to do, return early. */
  if( lsmTreeSize(pDb)==0 && lsmCheckpointOverflowRequired(pDb)==0 ){



    *pnOvfl = 0;
    return LSM_OK;

  }

  rc = sortedNewToplevel(pDb, 1, pnOvfl);
  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );

#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "tree flush");
................................................................................
    assert( pDb->pWorker==0 );
    rc = lsmBeginWork(pDb);
    if( rc==LSM_OK ){
      rc = sortedWork(pDb, nPage, bOptimize, &nWrite);
    }

    if( rc==LSM_OK && nWrite ){


      rc = lsmSortedFlushDb(pDb);
      if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){
        rc = sortedNewToplevel(pDb, 0, &nOvfl);
      }
    }

    if( nWrite ){
      lsmFinishWork(pDb, 0, nOvfl, &rc);
    }else{