Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Reduce the number of times malloc() is called when inserting a new entry. Ensure pointers to all shared-memory chunks are loaded when a read-transaction is opened.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 7ead7175e2e3b32a2ba6f3b04ec23eea3a1ece5f
User & Date: dan 2012-11-30 19:00:59.175
Context
2012-12-03
13:41
Add www/lsmperf.wiki. check-in: 938bb92e37 user: dan tags: trunk
2012-11-30
19:00
Reduce the number of times malloc() is called when inserting a new entry. Ensure pointers to all shared-memory chunks are loaded when a read-transaction is opened. check-in: 7ead7175e2 user: dan tags: trunk
2012-11-29
19:14
Avoid reading and checksumming an entire meta-page every time a write transaction is opened. check-in: d4c5a3bad7 user: dan tags: trunk
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/lsmInt.h.
787
788
789
790
791
792
793

794
795
796
797
798
799
800
*/
int lsmLogBegin(lsm_db *pDb);
int lsmLogWrite(lsm_db *, void *, int, void *, int);
int lsmLogCommit(lsm_db *);
void lsmLogEnd(lsm_db *pDb, int bCommit);
void lsmLogTell(lsm_db *, LogMark *);
void lsmLogSeek(lsm_db *, LogMark *);


int lsmLogRecover(lsm_db *);
int lsmInfoLogStructure(lsm_db *pDb, char **pzVal);


/**************************************************************************
** Functions from file "lsm_shared.c".







>







787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
*/
int lsmLogBegin(lsm_db *pDb);
int lsmLogWrite(lsm_db *, void *, int, void *, int);
int lsmLogCommit(lsm_db *);
void lsmLogEnd(lsm_db *pDb, int bCommit);
void lsmLogTell(lsm_db *, LogMark *);
void lsmLogSeek(lsm_db *, LogMark *);
void lsmLogClose(lsm_db *);

int lsmLogRecover(lsm_db *);
int lsmInfoLogStructure(lsm_db *pDb, char **pzVal);


/**************************************************************************
** Functions from file "lsm_shared.c".
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861


/* Candidate values for the 3rd argument to lsmShmLock() */
#define LSM_LOCK_UNLOCK 0
#define LSM_LOCK_SHARED 1
#define LSM_LOCK_EXCL   2

int lsmShmChunk(lsm_db *db, int iChunk, void **ppData);
int lsmShmLock(lsm_db *db, int iLock, int eOp, int bBlock);
void lsmShmBarrier(lsm_db *db);

#ifdef LSM_DEBUG
void lsmShmHasLock(lsm_db *db, int iLock, int eOp);
#else
# define lsmShmHasLock(x,y,z)







|







848
849
850
851
852
853
854
855
856
857
858
859
860
861
862


/* Candidate values for the 3rd argument to lsmShmLock() */
#define LSM_LOCK_UNLOCK 0
#define LSM_LOCK_SHARED 1
#define LSM_LOCK_EXCL   2

int lsmShmCacheChunks(lsm_db *db, int nChunk);
int lsmShmLock(lsm_db *db, int iLock, int eOp, int bBlock);
void lsmShmBarrier(lsm_db *db);

#ifdef LSM_DEBUG
void lsmShmHasLock(lsm_db *db, int iLock, int eOp);
#else
# define lsmShmHasLock(x,y,z)
Changes to src/lsm_log.c.
213
214
215
216
217
218
219
220
221
222
223
224

225
226
227
228
229
230
231
**   sectors as reported by lsmFsSectorSize().
*/
struct LogWriter {
  u32 cksum0;                     /* Checksum 0 at offset iOff */
  u32 cksum1;                     /* Checksum 1 at offset iOff */
  int iCksumBuf;                  /* Bytes of buf that have been checksummed */
  i64 iOff;                       /* Offset at start of buffer buf */
  LsmString buf;                  /* Buffer containing data not yet written */
  int szSector;                   /* Sector size for this transaction */
  LogRegion jump;                 /* Avoid writing to this region */
  i64 iRegion1End;                /* End of first region written by trans */
  i64 iRegion2Start;              /* Start of second regions written by trans */

};

/*
** Return the result of interpreting the first 4 bytes in buffer aIn as 
** a 32-bit unsigned little-endian integer.
*/
static u32 getU32le(u8 *aIn){







<




>







213
214
215
216
217
218
219

220
221
222
223
224
225
226
227
228
229
230
231
**   sectors as reported by lsmFsSectorSize().
*/
struct LogWriter {
  u32 cksum0;                     /* Checksum 0 at offset iOff */
  u32 cksum1;                     /* Checksum 1 at offset iOff */
  int iCksumBuf;                  /* Bytes of buf that have been checksummed */
  i64 iOff;                       /* Offset at start of buffer buf */

  int szSector;                   /* Sector size for this transaction */
  LogRegion jump;                 /* Avoid writing to this region */
  i64 iRegion1End;                /* End of first region written by trans */
  i64 iRegion2Start;              /* Start of second regions written by trans */
  LsmString buf;                  /* Buffer containing data not yet written */
};

/*
** Return the result of interpreting the first 4 bytes in buffer aIn as 
** a 32-bit unsigned little-endian integer.
*/
static u32 getU32le(u8 *aIn){
350
351
352
353
354
355
356



357

358
359
360
361
362







363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
*/
int lsmLogBegin(lsm_db *pDb){
  int rc = LSM_OK;
  LogWriter *pNew;
  LogRegion *aReg;

  if( pDb->bUseLog==0 ) return LSM_OK;



  rc = lsmFsOpenLog(pDb->pFS);

  pNew = lsmMallocZeroRc(pDb->pEnv, sizeof(LogWriter), &rc);
  if( pNew ){
    lsmStringInit(&pNew->buf, pDb->pEnv);
    rc = lsmStringExtend(&pNew->buf, 2);
  }







  if( rc==LSM_OK ){
    /* The following call detects whether or not a new snapshot has been 
    ** synced into the database file. If so, it updates the contents of
    ** the pDb->treehdr.log structure to reclaim any space in the log
    ** file that is no longer required. 
    **
    ** TODO: Calling this every transaction is overkill. And since the 
    ** call has to read and checksum a snapshot from the database file,
    ** it is expensive. It would be better to figure out a way so that
    ** this is only called occasionally - say for every 32KB written to 
    ** the log file.
    */
    rc = logReclaimSpace(pDb);
  }
  if( rc!=LSM_OK ){
    if( pNew ) lsmFree(pDb->pEnv, pNew->buf.z);
    lsmFree(pDb->pEnv, pNew);
    return rc;
  }

  /* Set the effective sector-size for this transaction. Sectors are assumed
  ** to be one byte in size if the safety-mode is OFF or NORMAL, or as
  ** reported by lsmFsSectorSize if it is FULL.  */
  if( pDb->eSafety==LSM_SAFETY_FULL ){







>
>
>

>
|
|
|
|
|
>
>
>
>
>
>
>















<
|







350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388

389
390
391
392
393
394
395
396
*/
int lsmLogBegin(lsm_db *pDb){
  int rc = LSM_OK;
  LogWriter *pNew;
  LogRegion *aReg;

  if( pDb->bUseLog==0 ) return LSM_OK;

  /* If the log file has not yet been opened, open it now. Also allocate
  ** the LogWriter structure, if it has not already been allocated.  */
  rc = lsmFsOpenLog(pDb->pFS);
  if( pDb->pLogWriter==0 ){
    pNew = lsmMallocZeroRc(pDb->pEnv, sizeof(LogWriter), &rc);
    if( pNew ){
      lsmStringInit(&pNew->buf, pDb->pEnv);
      rc = lsmStringExtend(&pNew->buf, 2);
    }
  }else{
    pNew = pDb->pLogWriter;
    assert( (u8 *)(&pNew[1])==(u8 *)(&((&pNew->buf)[1])) );
    memset(pNew, 0, ((u8 *)&pNew->buf) - (u8 *)pNew);
    pNew->buf.n = 0;
  }

  if( rc==LSM_OK ){
    /* The following call detects whether or not a new snapshot has been 
    ** synced into the database file. If so, it updates the contents of
    ** the pDb->treehdr.log structure to reclaim any space in the log
    ** file that is no longer required. 
    **
    ** TODO: Calling this every transaction is overkill. And since the 
    ** call has to read and checksum a snapshot from the database file,
    ** it is expensive. It would be better to figure out a way so that
    ** this is only called occasionally - say for every 32KB written to 
    ** the log file.
    */
    rc = logReclaimSpace(pDb);
  }
  if( rc!=LSM_OK ){

    lsmLogClose(pDb);
    return rc;
  }

  /* Set the effective sector-size for this transaction. Sectors are assumed
  ** to be one byte in size if the safety-mode is OFF or NORMAL, or as
  ** reported by lsmFsSectorSize if it is FULL.  */
  if( pDb->eSafety==LSM_SAFETY_FULL ){
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
      assert( pLog->aRegion[1].iEnd==0 );
      assert( pLog->aRegion[2].iStart<p->iRegion1End );
      pLog->aRegion[1].iStart = pLog->aRegion[2].iStart;
      pLog->aRegion[1].iEnd = p->iRegion1End;
      pLog->aRegion[2].iStart = p->iRegion2Start;
    }
  }
  lsmStringClear(&p->buf);
  lsmFree(pDb->pEnv, p);
  pDb->pLogWriter = 0;
}

static int jumpIfRequired(
  lsm_db *pDb,
  LogWriter *pLog,
  int nReq,
  int *pbJump







<
<
<







488
489
490
491
492
493
494



495
496
497
498
499
500
501
      assert( pLog->aRegion[1].iEnd==0 );
      assert( pLog->aRegion[2].iStart<p->iRegion1End );
      pLog->aRegion[1].iStart = pLog->aRegion[2].iStart;
      pLog->aRegion[1].iEnd = p->iRegion1End;
      pLog->aRegion[2].iStart = p->iRegion2Start;
    }
  }



}

static int jumpIfRequired(
  lsm_db *pDb,
  LogWriter *pLog,
  int nReq,
  int *pbJump
1098
1099
1100
1101
1102
1103
1104










  }

  lsmStringClear(&buf1);
  lsmStringClear(&buf2);
  lsmStringClear(&reader.buf);
  return rc;
}

















>
>
>
>
>
>
>
>
>
>
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
  }

  lsmStringClear(&buf1);
  lsmStringClear(&buf2);
  lsmStringClear(&reader.buf);
  return rc;
}

void lsmLogClose(lsm_db *db){
  if( db->pLogWriter ){
    lsmFree(db->pEnv, db->pLogWriter->buf.z);
    lsmFree(db->pEnv, db->pLogWriter);
    db->pLogWriter = 0;
  }
}


Changes to src/lsm_main.c.
187
188
189
190
191
192
193

194

195
196
197
198
199
200
201
    assert_db_state(pDb);
    if( pDb->pCsr || pDb->nTransOpen ){
      rc = LSM_MISUSE_BKPT;
    }else{
      lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
      pDb->pClient = 0;
      lsmDbDatabaseRelease(pDb);

      lsmFsClose(pDb->pFS);

      lsmFree(pDb->pEnv, pDb->aTrans);
      lsmFree(pDb->pEnv, pDb->apShm);
      lsmFree(pDb->pEnv, pDb);
    }
  }
  return rc;
}







>

>







187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
    assert_db_state(pDb);
    if( pDb->pCsr || pDb->nTransOpen ){
      rc = LSM_MISUSE_BKPT;
    }else{
      lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
      pDb->pClient = 0;
      lsmDbDatabaseRelease(pDb);
      lsmLogClose(pDb);
      lsmFsClose(pDb->pFS);
      lsmFree(pDb->pEnv, pDb->rollback.aArray);
      lsmFree(pDb->pEnv, pDb->aTrans);
      lsmFree(pDb->pEnv, pDb->apShm);
      lsmFree(pDb->pEnv, pDb);
    }
  }
  return rc;
}
Changes to src/lsm_shared.c.
200
201
202
203
204
205
206
207
208

209
210
211
212
213
214
215
static int doDbConnect(lsm_db *pDb){
  const int nUsMax = 100000;      /* Max value for nUs */
  int nUs = 1000;                 /* us to wait between DMS1 attempts */
  int rc;

  /* Obtain a pointer to the shared-memory header */
  assert( pDb->pShmhdr==0 );
  rc = lsmShmChunk(pDb, 0, (void **)&pDb->pShmhdr);
  if( rc!=LSM_OK ) return rc;


  /* Block for an exclusive lock on DMS1. This lock serializes all calls
  ** to doDbConnect() and doDbDisconnect() across all processes.  */
  while( 1 ){
    rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
    if( rc!=LSM_BUSY ) break;
    lsmEnvSleep(pDb->pEnv, nUs);







|

>







200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
static int doDbConnect(lsm_db *pDb){
  const int nUsMax = 100000;      /* Max value for nUs */
  int nUs = 1000;                 /* us to wait between DMS1 attempts */
  int rc;

  /* Obtain a pointer to the shared-memory header */
  assert( pDb->pShmhdr==0 );
  rc = lsmShmCacheChunks(pDb, 1);
  if( rc!=LSM_OK ) return rc;
  pDb->pShmhdr = (ShmHeader *)pDb->apShm[0];

  /* Block for an exclusive lock on DMS1. This lock serializes all calls
  ** to doDbConnect() and doDbDisconnect() across all processes.  */
  while( 1 ){
    rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
    if( rc!=LSM_BUSY ) break;
    lsmEnvSleep(pDb->pEnv, nUs);
836
837
838
839
840
841
842



843
844
845
846
847
848
849
      (int)pDb->treehdr.root.iTransId,
      (int)pDb->treehdr.iOldShmid
  );
}
#endif
  }




  if( rc!=LSM_OK ){
    lsmReleaseReadlock(pDb);
  }
  if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY;
  return rc;
}








>
>
>







837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
      (int)pDb->treehdr.root.iTransId,
      (int)pDb->treehdr.iOldShmid
  );
}
#endif
  }

  if( rc==LSM_OK ){
    rc = lsmShmCacheChunks(pDb, pDb->treehdr.nChunk);
  }
  if( rc!=LSM_OK ){
    lsmReleaseReadlock(pDb);
  }
  if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY;
  return rc;
}

1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170


1171
1172
1173


1174





1175
1176

1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191


1192
1193
1194

1195
1196
1197
1198
1199

1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213

1214

1215
1216
1217
1218

1219
1220

1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
**************************************************************************
**************************************************************************
**************************************************************************
**************************************************************************
*************************************************************************/

/*
** Retrieve a pointer to shared-memory chunk iChunk. Chunks are numbered
** starting from 0 (i.e. the header chunk is chunk 0).
*/
int lsmShmChunk(lsm_db *db, int iChunk, void **ppData){
  int rc = LSM_OK;


  void *pRet = 0;
  Database *p = db->pDatabase;
  lsm_env *pEnv = db->pEnv;








  while( iChunk>=db->nShm ){
    void **apShm;

    apShm = lsmRealloc(pEnv, db->apShm, sizeof(void*)*(db->nShm+16));
    if( !apShm ) return LSM_NOMEM_BKPT;
    memset(&apShm[db->nShm], 0, sizeof(void*)*16);
    db->apShm = apShm;
    db->nShm += 16;
  }
  if( db->apShm[iChunk] ){
    *ppData = db->apShm[iChunk];
    return rc;
  }

  /* Enter the client mutex */
  assert( iChunk>=0 );
  lsmMutexEnter(pEnv, p->pClientMutex);



  if( iChunk>=p->nShmChunk ){
    int nNew = iChunk+1;
    void **apNew;

    apNew = (void **)lsmRealloc(pEnv, p->apShmChunk, sizeof(void*) * nNew);
    if( apNew==0 ){
      rc = LSM_NOMEM_BKPT;
    }else{
      memset(&apNew[p->nShmChunk], 0, sizeof(void*) * (nNew-p->nShmChunk));

      p->apShmChunk = apNew;
      p->nShmChunk = nNew;
    }
  }

  if( rc==LSM_OK && p->apShmChunk[iChunk]==0 ){
    void *pChunk = 0;
    if( p->pFile==0 ){
      /* Single process mode */
      pChunk = lsmMallocZeroRc(pEnv, LSM_SHM_CHUNK_SIZE, &rc);
    }else{
      /* Multi-process mode */
      rc = lsmEnvShmMap(pEnv, p->pFile, iChunk, LSM_SHM_CHUNK_SIZE, &pChunk);
    }

    p->apShmChunk[iChunk] = pChunk;

  }

  if( rc==LSM_OK ){
    pRet = p->apShmChunk[iChunk];

  }


  /* Release the client mutex */
  lsmMutexLeave(pEnv, p->pClientMutex);

  *ppData = db->apShm[iChunk] = pRet; 
  return rc;
}

/*
** Attempt to obtain the lock identified by the iLock and bExcl parameters.
** If successful, return LSM_OK. If the lock cannot be obtained because 
** there exists some other conflicting lock, return LSM_BUSY. If some other







|
|

|

>
>
|
|
|
>
>

>
>
>
>
>
|
|
>
|
|
<
|
<
|
<
<
<
|
<
|
<
|

>
>
|
|
|
>
|
|
|
|
<
>
|
<

|
|
|
|
|
|
|
|
|
|
|
>
|
>
|
|
|
|
>
|
|
>
|
|
|
|







1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192

1193

1194



1195

1196

1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208

1209
1210

1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
**************************************************************************
**************************************************************************
**************************************************************************
**************************************************************************
*************************************************************************/

/*
** Ensure that database connection db has cached pointers to at least the 
** first nChunk chunks of shared memory.
*/
int lsmShmCacheChunks(lsm_db *db, int nChunk){
  int rc = LSM_OK;
  if( nChunk>db->nShm ){
    static const int NINCR = 16;
    void *pRet = 0;
    Database *p = db->pDatabase;
    lsm_env *pEnv = db->pEnv;
    int nAlloc;
    int i;

    /* Ensure that the db->apShm[] array is large enough. If an attempt to
    ** allocate memory fails, return LSM_NOMEM immediately. The apShm[] array
    ** is always extended in multiples of 16 entries - so the actual allocated
    ** size can be inferred from nShm.  */ 
    nAlloc = ((db->nShm + NINCR - 1) / NINCR) * NINCR;
    while( nChunk>=nAlloc ){
      void **apShm;
      nAlloc += NINCR;
      apShm = lsmRealloc(pEnv, db->apShm, sizeof(void*)*nAlloc);
      if( !apShm ) return LSM_NOMEM_BKPT;

      db->apShm = apShm;

    }





    /* Enter the client mutex */

    lsmMutexEnter(pEnv, p->pClientMutex);

    /* Extend the Database objects apShmChunk[] array if necessary. Using the
    ** same pattern as for the lsm_db.apShm[] array above.  */
    nAlloc = ((p->nShmChunk + NINCR - 1) / NINCR) * NINCR;
    while( nChunk>=nAlloc ){
      void **apShm;
      nAlloc +=  NINCR;
      apShm = lsmRealloc(pEnv, p->apShmChunk, sizeof(void*)*nAlloc);
      if( !apShm ){
        rc = LSM_NOMEM_BKPT;
        break;

      }
      p->apShmChunk = apShm;

    }

    for(i=db->nShm; rc==LSM_OK && i<nChunk; i++){
      if( i>=p->nShmChunk ){
        void *pChunk = 0;
        if( p->pFile==0 ){
          /* Single process mode */
          pChunk = lsmMallocZeroRc(pEnv, LSM_SHM_CHUNK_SIZE, &rc);
        }else{
          /* Multi-process mode */
          rc = lsmEnvShmMap(pEnv, p->pFile, i, LSM_SHM_CHUNK_SIZE, &pChunk);
        }
        if( rc==LSM_OK ){
          p->apShmChunk[i] = pChunk;
          p->nShmChunk++;
        }
      }
      if( rc==LSM_OK ){
        db->apShm[i] = p->apShmChunk[i];
        db->nShm++;
      }
    }

    /* Release the client mutex */
    lsmMutexLeave(pEnv, p->pClientMutex);
  }

  return rc;
}

/*
** Attempt to obtain the lock identified by the iLock and bExcl parameters.
** If successful, return LSM_OK. If the lock cannot be obtained because 
** there exists some other conflicting lock, return LSM_BUSY. If some other
Changes to src/lsm_sorted.c.
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
/*
** The following constants are used to assign integers to each component
** cursor of a multi-cursor.
*/
#define CURSOR_DATA_TREE0     0   /* Current tree cursor (apTreeCsr[0]) */
#define CURSOR_DATA_TREE1     1   /* The "old" tree, if any (apTreeCsr[1]) */
#define CURSOR_DATA_SYSTEM    2   /* Free-list entries (new-toplevel only) */
#define CURSOR_DATA_SEGMENT   3

/*
** CURSOR_IGNORE_DELETE
**   If set, this cursor will not visit SORTED_DELETE keys.
**
** CURSOR_FLUSH_FREELIST
**   This cursor is being used to create a new toplevel. It should also 







|







220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
/*
** The following constants are used to assign integers to each component
** cursor of a multi-cursor.
*/
#define CURSOR_DATA_TREE0     0   /* Current tree cursor (apTreeCsr[0]) */
#define CURSOR_DATA_TREE1     1   /* The "old" tree, if any (apTreeCsr[1]) */
#define CURSOR_DATA_SYSTEM    2   /* Free-list entries (new-toplevel only) */
#define CURSOR_DATA_SEGMENT   3   /* First segment pointer (aPtr[0]) */

/*
** CURSOR_IGNORE_DELETE
**   If set, this cursor will not visit SORTED_DELETE keys.
**
** CURSOR_FLUSH_FREELIST
**   This cursor is being used to create a new toplevel. It should also 
4053
4054
4055
4056
4057
4058
4059




4060
4061
4062
4063
4064
4065
4066
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  int nWrite = 0;                 /* Number of database pages written */
  Freelist freelist;





  assert( pDb->bUseFreelist==0 );
  pDb->pFreelist = &freelist;
  pDb->bUseFreelist = 1;
  memset(&freelist, 0, sizeof(freelist));

  /* Allocate the new level structure to write to. */







>
>
>
>







4053
4054
4055
4056
4057
4058
4059
4060
4061
4062
4063
4064
4065
4066
4067
4068
4069
4070
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  int nWrite = 0;                 /* Number of database pages written */
  Freelist freelist;

  if( eTree!=TREE_NONE ){
    rc = lsmShmCacheChunks(pDb, pDb->treehdr.nChunk);
  }

  assert( pDb->bUseFreelist==0 );
  pDb->pFreelist = &freelist;
  pDb->bUseFreelist = 1;
  memset(&freelist, 0, sizeof(freelist));

  /* Allocate the new level structure to write to. */
4085
4086
4087
4088
4089
4090
4091
4092
4093
4094
4095
4096
4097
4098
4099
4100
4101
4102
        && pNext && pNext->pMerge==0 && pNext->lhs.iRoot 
        && (eTree!=TREE_NONE || (pNext->flags & LEVEL_FREELIST_ONLY))
    ){
      pDel = &pNext->lhs;
      rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr);
    }

    if( pNext==0 ){
      multiCursorIgnoreDelete(pCsr);
    }

    if( pNext==0 ){
      multiCursorIgnoreDelete(pCsr);
    }
  }

  if( rc!=LSM_OK ){
    lsmMCursorClose(pCsr);







<
|
<
|







4089
4090
4091
4092
4093
4094
4095

4096

4097
4098
4099
4100
4101
4102
4103
4104
        && pNext && pNext->pMerge==0 && pNext->lhs.iRoot 
        && (eTree!=TREE_NONE || (pNext->flags & LEVEL_FREELIST_ONLY))
    ){
      pDel = &pNext->lhs;
      rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr);
    }


    /* If this will be the only segment in the database, discard any delete

    ** markers present in the in-memory tree.  */
    if( pNext==0 ){
      multiCursorIgnoreDelete(pCsr);
    }
  }

  if( rc!=LSM_OK ){
    lsmMCursorClose(pCsr);
4612
4613
4614
4615
4616
4617
4618
4619
4620
4621
4622
4623
4624
4625
4626
4627
4628
*/
static int sortedTreeHasOld(lsm_db *pDb, int *pRc){
  int rc = LSM_OK;
  int bRet = 0;

  assert( pDb->pWorker );
  if( *pRc==LSM_OK ){
    if( pDb->nTransOpen==0 ){
      rc = lsmTreeLoadHeader(pDb, 0);
    }
    if( rc==LSM_OK 
        && pDb->treehdr.iOldShmid
        && pDb->treehdr.iOldLog!=pDb->pWorker->iLogOff 
      ){
      bRet = 1;
    }else{
      bRet = 0;







<
<
<







4614
4615
4616
4617
4618
4619
4620



4621
4622
4623
4624
4625
4626
4627
*/
static int sortedTreeHasOld(lsm_db *pDb, int *pRc){
  int rc = LSM_OK;
  int bRet = 0;

  assert( pDb->pWorker );
  if( *pRc==LSM_OK ){



    if( rc==LSM_OK 
        && pDb->treehdr.iOldShmid
        && pDb->treehdr.iOldLog!=pDb->pWorker->iLogOff 
      ){
      bRet = 1;
    }else{
      bRet = 0;
4679
4680
4681
4682
4683
4684
4685



4686
4687
4688
4689
4690
4691
4692
      bCkpt = 1;
      nRem = LSM_MAX(nMax, 0);
    }
  }

  /* If there exists in-memory data ready to be flushed to disk, attempt
  ** to flush it now.  */



  if( sortedTreeHasOld(pDb, &rc) ){
    /* sortedDbIsFull() returns non-zero if either (a) there are too many
    ** levels in total in the db, or (b) there are too many levels with the
    ** the same age in the db. Either way, call sortedWork() to merge 
    ** existing segments together until this condition is cleared.  */
    if( sortedDbIsFull(pDb) ){
      int nPg = 0;







>
>
>







4678
4679
4680
4681
4682
4683
4684
4685
4686
4687
4688
4689
4690
4691
4692
4693
4694
      bCkpt = 1;
      nRem = LSM_MAX(nMax, 0);
    }
  }

  /* If there exists in-memory data ready to be flushed to disk, attempt
  ** to flush it now.  */
  if( pDb->nTransOpen==0 ){
    rc = lsmTreeLoadHeader(pDb, 0);
  }
  if( sortedTreeHasOld(pDb, &rc) ){
    /* sortedDbIsFull() returns non-zero if either (a) there are too many
    ** levels in total in the db, or (b) there are too many levels with the
    ** the same age in the db. Either way, call sortedWork() to merge 
    ** existing segments together until this condition is cleared.  */
    if( sortedDbIsFull(pDb) ){
      int nPg = 0;
Changes to src/lsm_tree.c.
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
  return LSM_OK;
}

/*
** Zero the IntArray object.
*/
static void intArrayFree(lsm_env *pEnv, IntArray *p){
  lsmFree(pEnv, p->aArray);
  memset(p, 0, sizeof(IntArray));
}

/*
** Return the number of entries currently in the int-array object.
*/
static int intArraySize(IntArray *p){
  return p->nArray;







|
<







237
238
239
240
241
242
243
244

245
246
247
248
249
250
251
  return LSM_OK;
}

/*
** Zero the IntArray object.
*/
static void intArrayFree(lsm_env *pEnv, IntArray *p){
  p->nArray = 0;

}

/*
** Return the number of entries currently in the int-array object.
*/
static int intArraySize(IntArray *p){
  return p->nArray;
288
289
290
291
292
293
294
295
296
297
298
299
300
301

302
303
304
305
306
307
308
309
310
311
312
313
314
315


316


317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
  return (int)(iOff>>15);
}

/*
** Return a pointer to the mapped memory location associated with *-shm 
** file offset iPtr.
*/
static void *treeShmptr(lsm_db *pDb, u32 iPtr, int *pRc){
  /* TODO: This will likely be way too slow. If it is, chunks should be
  ** cached as part of the db handle.  */
  void *pChunk;
  int iChunk = (iPtr>>15);
  assert( LSM_SHM_CHUNK_SIZE==(1<<15) );


  if( (pDb->nShm<=iChunk || 0==(pChunk = pDb->apShm[iChunk])) ){
    *pRc = lsmShmChunk(pDb, iChunk, &pChunk);
  }

  if( iPtr==0 || *pRc ) return 0;
  return &((u8 *)pChunk)[iPtr & (LSM_SHM_CHUNK_SIZE-1)];
}

static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){
  int rcdummy = LSM_OK;
  return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, &rcdummy);
}

static ShmChunk * treeShmChunkRc(lsm_db *pDb, int iChunk, int *pRc){


  return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, pRc);


}


#ifndef NDEBUG
static void assertIsWorkingChild(
  lsm_db *db, 
  TreeNode *pNode, 
  TreeNode *pParent, 
  int iCell
){
  TreeNode *p;
  int rc = LSM_OK;
  u32 iPtr = getChildPtr(pParent, WORKING_VERSION, iCell);
  p = treeShmptr(db, iPtr, &rc);
  assert( p==pNode || rc!=LSM_OK );
}
#else
# define assertIsWorkingChild(w,x,y,z)
#endif

/* Values for the third argument to treeShmkey(). */
#define TKV_LOADKEY  1
#define TKV_LOADVAL  2

static TreeKey *treeShmkey(
  lsm_db *pDb,                    /* Database handle */
  u32 iPtr,                       /* Shmptr to TreeKey struct */
  int eLoad,                      /* Either zero or a TREEKEY_LOADXXX value */
  TreeBlob *pBlob,                /* Used if dynamic memory is required */
  int *pRc                        /* IN/OUT: Error code */
){
  TreeKey *pRet;

  assert( eLoad==TKV_LOADKEY || eLoad==TKV_LOADVAL );
  pRet = (TreeKey *)treeShmptr(pDb, iPtr, pRc);
  if( pRet ){
    int nReq;                     /* Bytes of space required at pRet */
    int nAvail;                   /* Bytes of space available at pRet */

    nReq = sizeof(TreeKey) + pRet->nKey;
    if( eLoad==TKV_LOADVAL && pRet->nValue>0 ){
      nReq += pRet->nValue;
    }
    assert( LSM_SHM_CHUNK_SIZE==(1<<15) );
    nAvail = LSM_SHM_CHUNK_SIZE - (iPtr & (LSM_SHM_CHUNK_SIZE-1));

    if( nAvail<nReq ){
      if( tblobGrow(pDb, pBlob, nReq, pRc)==0 ){
        int nLoad = 0;
        while( *pRc==LSM_OK ){
          ShmChunk *pChunk;
          void *p = treeShmptr(pDb, iPtr, pRc);
          int n = LSM_MIN(nAvail, nReq-nLoad);

          memcpy(&pBlob->a[nLoad], p, n);
          nLoad += n;
          if( nLoad==nReq ) break;

          pChunk = treeShmChunk(pDb, treeOffsetToChunk(iPtr));







|
<
<
<
<
<

>
|
<
|
|
<
<



<
|



>
>
|
>
>













|
|



















|
















|







287
288
289
290
291
292
293
294





295
296
297

298
299


300
301
302

303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
  return (int)(iOff>>15);
}

/*
** Return a pointer to the mapped memory location associated with *-shm 
** file offset iPtr.
*/
static void *treeShmptr(lsm_db *pDb, u32 iPtr){






  assert( (iPtr>>15)<pDb->nShm );
  assert( pDb->apShm[iPtr>>15] );


  return iPtr?(&((u8*)(pDb->apShm[iPtr>>15]))[iPtr & (LSM_SHM_CHUNK_SIZE-1)]):0;


}

static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){

  return (ShmChunk *)(pDb->apShm[iChunk]);
}

static ShmChunk * treeShmChunkRc(lsm_db *pDb, int iChunk, int *pRc){
  assert( *pRc==LSM_OK );
  if( iChunk<pDb->nShm || LSM_OK==(*pRc = lsmShmCacheChunks(pDb, iChunk+1)) ){
    return (ShmChunk *)(pDb->apShm[iChunk]);
  }
  return 0;
}


#ifndef NDEBUG
static void assertIsWorkingChild(
  lsm_db *db, 
  TreeNode *pNode, 
  TreeNode *pParent, 
  int iCell
){
  TreeNode *p;
  int rc = LSM_OK;
  u32 iPtr = getChildPtr(pParent, WORKING_VERSION, iCell);
  p = treeShmptr(db, iPtr);
  assert( p==pNode );
}
#else
# define assertIsWorkingChild(w,x,y,z)
#endif

/* Values for the third argument to treeShmkey(). */
#define TKV_LOADKEY  1
#define TKV_LOADVAL  2

static TreeKey *treeShmkey(
  lsm_db *pDb,                    /* Database handle */
  u32 iPtr,                       /* Shmptr to TreeKey struct */
  int eLoad,                      /* Either zero or a TREEKEY_LOADXXX value */
  TreeBlob *pBlob,                /* Used if dynamic memory is required */
  int *pRc                        /* IN/OUT: Error code */
){
  TreeKey *pRet;

  assert( eLoad==TKV_LOADKEY || eLoad==TKV_LOADVAL );
  pRet = (TreeKey *)treeShmptr(pDb, iPtr);
  if( pRet ){
    int nReq;                     /* Bytes of space required at pRet */
    int nAvail;                   /* Bytes of space available at pRet */

    nReq = sizeof(TreeKey) + pRet->nKey;
    if( eLoad==TKV_LOADVAL && pRet->nValue>0 ){
      nReq += pRet->nValue;
    }
    assert( LSM_SHM_CHUNK_SIZE==(1<<15) );
    nAvail = LSM_SHM_CHUNK_SIZE - (iPtr & (LSM_SHM_CHUNK_SIZE-1));

    if( nAvail<nReq ){
      if( tblobGrow(pDb, pBlob, nReq, pRc)==0 ){
        int nLoad = 0;
        while( *pRc==LSM_OK ){
          ShmChunk *pChunk;
          void *p = treeShmptr(pDb, iPtr);
          int n = LSM_MIN(nAvail, nReq-nLoad);

          memcpy(&pBlob->a[nLoad], p, n);
          nLoad += n;
          if( nLoad==nReq ) break;

          pChunk = treeShmChunk(pDb, treeOffsetToChunk(iPtr));
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
  const char *zSpace = "                                           ";
  int i;
  int rc = LSM_OK;
  LsmString s;
  TreeNode *pNode;
  TreeBlob b = {0, 0};

  pNode = (TreeNode *)treeShmptr(pDb, iNode, &rc);

  if( nHeight==0 ){
    /* Append the nIndent bytes of space to string s. */
    lsmStringInit(&s, pDb->pEnv);

    /* Append each key to string s. */
    for(i=0; i<3; i++){







|







483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
  const char *zSpace = "                                           ";
  int i;
  int rc = LSM_OK;
  LsmString s;
  TreeNode *pNode;
  TreeBlob b = {0, 0};

  pNode = (TreeNode *)treeShmptr(pDb, iNode);

  if( nHeight==0 ){
    /* Append the nIndent bytes of space to string s. */
    lsmStringInit(&s, pDb->pEnv);

    /* Append each key to string s. */
    for(i=0; i<3; i++){
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
        pNext->iShmid = (pDb->treehdr.iNextShmid++);
      }else{
        *pRc = rc;
        return 0;
      }

      /* Set the header values for the chunk just finished */
      pHdr = (ShmChunk *)treeShmptr(pDb, iChunk*CHUNK_SIZE, pRc);
      pHdr->iNext = iNext;

      /* Advance to the next chunk */
      iWrite = iNext * CHUNK_SIZE + CHUNK_HDR;
    }

    /* Allocate space at iWrite. */







|







669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
        pNext->iShmid = (pDb->treehdr.iNextShmid++);
      }else{
        *pRc = rc;
        return 0;
      }

      /* Set the header values for the chunk just finished */
      pHdr = (ShmChunk *)treeShmptr(pDb, iChunk*CHUNK_SIZE);
      pHdr->iNext = iNext;

      /* Advance to the next chunk */
      iWrite = iNext * CHUNK_SIZE + CHUNK_HDR;
    }

    /* Allocate space at iWrite. */
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
/*
** Allocate and zero nByte bytes of space within the *-shm file.
*/
static void *treeShmallocZero(lsm_db *pDb, int nByte, u32 *piPtr, int *pRc){
  u32 iPtr;
  void *p;
  iPtr = treeShmalloc(pDb, 1, nByte, pRc);
  p = treeShmptr(pDb, iPtr, pRc);
  if( p ){
    assert( *pRc==LSM_OK );
    memset(p, 0, nByte);
    *piPtr = iPtr;
  }
  return p;
}







|







691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
/*
** Allocate and zero nByte bytes of space within the *-shm file.
*/
static void *treeShmallocZero(lsm_db *pDb, int nByte, u32 *piPtr, int *pRc){
  u32 iPtr;
  void *p;
  iPtr = treeShmalloc(pDb, 1, nByte, pRc);
  p = treeShmptr(pDb, iPtr);
  if( p ){
    assert( *pRc==LSM_OK );
    memset(p, 0, nByte);
    *piPtr = iPtr;
  }
  return p;
}
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
  u32 iPtr;
  int nRem;
  u8 *a;
  int n;

  /* Allocate space for the TreeKey structure itself */
  *piPtr = iPtr = treeShmalloc(pDb, 1, sizeof(TreeKey), pRc);
  p = treeShmptr(pDb, iPtr, pRc);
  if( *pRc ) return 0;
  p->nKey = nKey;
  p->nValue = nVal;

  /* Allocate and populate the space required for the key and value. */
  n = nRem = nKey;
  a = (u8 *)pKey;
  while( a ){
    while( nRem>0 ){
      u8 *aAlloc;
      int nAlloc;
      u32 iWrite;

      iWrite = (pDb->treehdr.iWrite & (LSM_SHM_CHUNK_SIZE-1));
      iWrite = LSM_MAX(iWrite, LSM_SHM_CHUNK_HDR);
      nAlloc = LSM_MIN((LSM_SHM_CHUNK_SIZE-iWrite), nRem);

      aAlloc = treeShmptr(pDb, treeShmalloc(pDb, 0, nAlloc, pRc), pRc);
      if( aAlloc==0 ) break;
      memcpy(aAlloc, &a[n-nRem], nAlloc);
      nRem -= nAlloc;
    }
    a = pVal;
    n = nRem = nVal;
    pVal = 0;







|

















|







723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
  u32 iPtr;
  int nRem;
  u8 *a;
  int n;

  /* Allocate space for the TreeKey structure itself */
  *piPtr = iPtr = treeShmalloc(pDb, 1, sizeof(TreeKey), pRc);
  p = treeShmptr(pDb, iPtr);
  if( *pRc ) return 0;
  p->nKey = nKey;
  p->nValue = nVal;

  /* Allocate and populate the space required for the key and value. */
  n = nRem = nKey;
  a = (u8 *)pKey;
  while( a ){
    while( nRem>0 ){
      u8 *aAlloc;
      int nAlloc;
      u32 iWrite;

      iWrite = (pDb->treehdr.iWrite & (LSM_SHM_CHUNK_SIZE-1));
      iWrite = LSM_MAX(iWrite, LSM_SHM_CHUNK_HDR);
      nAlloc = LSM_MIN((LSM_SHM_CHUNK_SIZE-iWrite), nRem);

      aAlloc = treeShmptr(pDb, treeShmalloc(pDb, 0, nAlloc, pRc));
      if( aAlloc==0 ) break;
      memcpy(aAlloc, &a[n-nRem], nAlloc);
      nRem -= nAlloc;
    }
    a = pVal;
    n = nRem = nVal;
    pVal = 0;
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
  /* Cursor currently points to a leaf node. */
  assert( pCsr->iNode==(db->treehdr.root.nHeight-1) );

  while( iNode>=0 ){
    TreeNode *pNode = pCsr->apTreeNode[iNode];
    if( iCell<3 && pNode->aiKeyPtr[iCell] ){
      int rc = LSM_OK;
      TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell], &rc);
      assert( rc==LSM_OK );
      return ((pKey->flags & LSM_END_DELETE) ? 1 : 0);
    }
    iNode--;
    iCell = pCsr->aiCell[iNode];
  }








|







1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
  /* Cursor currently points to a leaf node. */
  assert( pCsr->iNode==(db->treehdr.root.nHeight-1) );

  while( iNode>=0 ){
    TreeNode *pNode = pCsr->apTreeNode[iNode];
    if( iCell<3 && pNode->aiKeyPtr[iCell] ){
      int rc = LSM_OK;
      TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
      assert( rc==LSM_OK );
      return ((pKey->flags & LSM_END_DELETE) ? 1 : 0);
    }
    iNode--;
    iCell = pCsr->aiCell[iNode];
  }

1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
  assert( pCsr->iNode==(db->treehdr.root.nHeight-1) );

  while( iNode>=0 ){
    TreeNode *pNode = pCsr->apTreeNode[iNode];
    int iCell = pCsr->aiCell[iNode]-1;
    if( iCell>=0 && pNode->aiKeyPtr[iCell] ){
      int rc = LSM_OK;
      TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell], &rc);
      assert( rc==LSM_OK );
      return ((pKey->flags & LSM_START_DELETE) ? 1 : 0);
    }
    iNode--;
  }

  return 0;







|







1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
  assert( pCsr->iNode==(db->treehdr.root.nHeight-1) );

  while( iNode>=0 ){
    TreeNode *pNode = pCsr->apTreeNode[iNode];
    int iCell = pCsr->aiCell[iNode]-1;
    if( iCell>=0 && pNode->aiKeyPtr[iCell] ){
      int rc = LSM_OK;
      TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
      assert( rc==LSM_OK );
      return ((pKey->flags & LSM_START_DELETE) ? 1 : 0);
    }
    iNode--;
  }

  return 0;
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653

    if( iPSlot>0 && getChildPtr(pParent, WORKING_VERSION, iPSlot-1) ){
      iDir = -1;
    }else{
      iDir = +1;
    }
    iPeer = getChildPtr(pParent, WORKING_VERSION, iPSlot+iDir);
    pPeer = (TreeNode *)treeShmptr(db, iPeer, &rc);
    assertIsWorkingChild(db, pNode, pParent, iPSlot);

    /* Allocate the first new leaf node. This is always required. */
    if( bLeaf ){
      pNew1 = (TreeNode *)newTreeLeaf(db, &iNew1, &rc);
    }else{
      pNew1 = (TreeNode *)newTreeNode(db, &iNew1, &rc);







|







1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648

    if( iPSlot>0 && getChildPtr(pParent, WORKING_VERSION, iPSlot-1) ){
      iDir = -1;
    }else{
      iDir = +1;
    }
    iPeer = getChildPtr(pParent, WORKING_VERSION, iPSlot+iDir);
    pPeer = (TreeNode *)treeShmptr(db, iPeer);
    assertIsWorkingChild(db, pNode, pParent, iPSlot);

    /* Allocate the first new leaf node. This is always required. */
    if( bLeaf ){
      pNew1 = (TreeNode *)newTreeLeaf(db, &iNew1, &rc);
    }else{
      pNew1 = (TreeNode *)newTreeNode(db, &iNew1, &rc);
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
    int res = 0;                  /* Result of comparison function */
    int iNode = -1;
    while( iNodePtr ){
      TreeNode *pNode;            /* Node at location iNodePtr */
      int iTest;                  /* Index of second key to test (0 or 2) */
      TreeKey *pTreeKey;          /* Key to compare against */

      pNode = (TreeNode *)treeShmptr(pDb, iNodePtr, &rc);
      iNode++;
      pCsr->apTreeNode[iNode] = pNode;

      /* Compare (pKey/nKey) with the key in the middle slot of B-tree node
      ** pNode. The middle slot is never empty. If the comparison is a match,
      ** then the search is finished. Break out of the loop. */
      pTreeKey = treeShmkey(pDb, pNode->aiKeyPtr[1], TKV_LOADKEY, &b, &rc);







|







1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
    int res = 0;                  /* Result of comparison function */
    int iNode = -1;
    while( iNodePtr ){
      TreeNode *pNode;            /* Node at location iNodePtr */
      int iTest;                  /* Index of second key to test (0 or 2) */
      TreeKey *pTreeKey;          /* Key to compare against */

      pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
      iNode++;
      pCsr->apTreeNode[iNode] = pNode;

      /* Compare (pKey/nKey) with the key in the middle slot of B-tree node
      ** pNode. The middle slot is never empty. If the comparison is a match,
      ** then the search is finished. Break out of the loop. */
      pTreeKey = treeShmkey(pDb, pNode->aiKeyPtr[1], TKV_LOADKEY, &b, &rc);
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
  ** associated with it, descend to the left-most key on the left-most
  ** leaf of the sub-tree.  */
  if( pCsr->iNode<iLeaf && getChildPtr(pNode, pRoot->iTransId, iCell) ){
    do {
      u32 iNodePtr;
      pCsr->iNode++;
      iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
      pNode = (TreeNode *)treeShmptr(pDb, iNodePtr, &rc);
      pCsr->apTreeNode[pCsr->iNode] = pNode;
      iCell = pCsr->aiCell[pCsr->iNode] = (pNode->aiKeyPtr[0]==0);
    }while( pCsr->iNode < iLeaf );
  }

  /* Otherwise, the next key is found by following pointer up the tree 
  ** until there is a key immediately to the right of the pointer followed 







|







2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
  ** associated with it, descend to the left-most key on the left-most
  ** leaf of the sub-tree.  */
  if( pCsr->iNode<iLeaf && getChildPtr(pNode, pRoot->iTransId, iCell) ){
    do {
      u32 iNodePtr;
      pCsr->iNode++;
      iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
      pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
      pCsr->apTreeNode[pCsr->iNode] = pNode;
      iCell = pCsr->aiCell[pCsr->iNode] = (pNode->aiKeyPtr[0]==0);
    }while( pCsr->iNode < iLeaf );
  }

  /* Otherwise, the next key is found by following pointer up the tree 
  ** until there is a key immediately to the right of the pointer followed 
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
  ** associated with it, descend to the right-most key on the right-most
  ** leaf of the sub-tree.  */
  if( pCsr->iNode<iLeaf && getChildPtr(pNode, pRoot->iTransId, iCell) ){
    do {
      u32 iNodePtr;
      pCsr->iNode++;
      iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
      pNode = (TreeNode *)treeShmptr(pDb, iNodePtr, &rc);
      if( rc!=LSM_OK ) break;
      pCsr->apTreeNode[pCsr->iNode] = pNode;
      iCell = 1 + (pNode->aiKeyPtr[2]!=0) + (pCsr->iNode < iLeaf);
      pCsr->aiCell[pCsr->iNode] = iCell;
    }while( pCsr->iNode < iLeaf );
  }








|







2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
  ** associated with it, descend to the right-most key on the right-most
  ** leaf of the sub-tree.  */
  if( pCsr->iNode<iLeaf && getChildPtr(pNode, pRoot->iTransId, iCell) ){
    do {
      u32 iNodePtr;
      pCsr->iNode++;
      iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
      pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
      if( rc!=LSM_OK ) break;
      pCsr->apTreeNode[pCsr->iNode] = pNode;
      iCell = 1 + (pNode->aiKeyPtr[2]!=0) + (pCsr->iNode < iLeaf);
      pCsr->aiCell[pCsr->iNode] = iCell;
    }while( pCsr->iNode < iLeaf );
  }

2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
  treeCursorRestore(pCsr, 0);

  iNodePtr = pRoot->iRoot;
  while( iNodePtr ){
    int iCell;
    TreeNode *pNode;

    pNode = (TreeNode *)treeShmptr(pDb, iNodePtr, &rc);
    if( rc ) break;

    if( bLast ){
      iCell = ((pNode->aiKeyPtr[2]==0) ? 2 : 3);
    }else{
      iCell = ((pNode->aiKeyPtr[0]==0) ? 1 : 0);
    }







|







2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
  treeCursorRestore(pCsr, 0);

  iNodePtr = pRoot->iRoot;
  while( iNodePtr ){
    int iCell;
    TreeNode *pNode;

    pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
    if( rc ) break;

    if( bLast ){
      iCell = ((pNode->aiKeyPtr[2]==0) ? 2 : 3);
    }else{
      iCell = ((pNode->aiKeyPtr[0]==0) ? 1 : 0);
    }
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
}

int lsmTreeCursorFlags(TreeCursor *pCsr){
  int flags = 0;
  if( pCsr && pCsr->iNode>=0 ){
    int rc = LSM_OK;
    TreeKey *pKey = (TreeKey *)treeShmptr(pCsr->pDb,
        pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]], &rc
    );
    assert( rc==LSM_OK );
    flags = pKey->flags;
  }
  return flags;
}








|







2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
}

int lsmTreeCursorFlags(TreeCursor *pCsr){
  int flags = 0;
  if( pCsr && pCsr->iNode>=0 ){
    int rc = LSM_OK;
    TreeKey *pKey = (TreeKey *)treeShmptr(pCsr->pDb,
        pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]]
    );
    assert( rc==LSM_OK );
    flags = pKey->flags;
  }
  return flags;
}

2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
}

/*
** Roll back to mark pMark. Structure *pMark should have been previously
** populated by a call to lsmTreeMark().
*/
void lsmTreeRollback(lsm_db *pDb, TreeMark *pMark){
  int rcdummy = LSM_OK;
  int iIdx;
  int nIdx;
  u32 iNext;
  ShmChunk *pChunk;
  u32 iChunk;
  u32 iShmid;

  /* Revert all required v2 pointers. */
  nIdx = intArraySize(&pDb->rollback);
  for(iIdx = pMark->iRollback; iIdx<nIdx; iIdx++){
    TreeNode *pNode;
    pNode = treeShmptr(pDb, intArrayEntry(&pDb->rollback, iIdx), &rcdummy);
    assert( pNode && rcdummy==LSM_OK );
    pNode->iV2 = 0;
    pNode->iV2Child = 0;
    pNode->iV2Ptr = 0;
  }
  intArrayTruncate(&pDb->rollback, pMark->iRollback);

  /* Restore the free-chunk list. */







<











|
|







2276
2277
2278
2279
2280
2281
2282

2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
}

/*
** Roll back to mark pMark. Structure *pMark should have been previously
** populated by a call to lsmTreeMark().
*/
void lsmTreeRollback(lsm_db *pDb, TreeMark *pMark){

  int iIdx;
  int nIdx;
  u32 iNext;
  ShmChunk *pChunk;
  u32 iChunk;
  u32 iShmid;

  /* Revert all required v2 pointers. */
  nIdx = intArraySize(&pDb->rollback);
  for(iIdx = pMark->iRollback; iIdx<nIdx; iIdx++){
    TreeNode *pNode;
    pNode = treeShmptr(pDb, intArrayEntry(&pDb->rollback, iIdx));
    assert( pNode );
    pNode->iV2 = 0;
    pNode->iV2Child = 0;
    pNode->iV2Ptr = 0;
  }
  intArrayTruncate(&pDb->rollback, pMark->iRollback);

  /* Restore the free-chunk list. */