Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add API to register a compression-factory method with an lsm handle.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | compression-id
Files: files | file ages | folders
SHA1: 60908fd4d12c0678a8cd1462a4e66d680ebd7c11
User & Date: dan 2013-02-06 19:43:37.889
Context
2013-02-07
19:50
Add the INFO_COMPRESSION_ID request. And the factory method for providing compression/encryption functions. Leaf check-in: bb85de9cd3 user: dan tags: compression-id
2013-02-06
19:43
Add API to register a compression-factory method with an lsm handle. check-in: 60908fd4d1 user: dan tags: compression-id
19:03
Add a field to the database header to identify the compression scheme in use. check-in: 3bf1db9709 user: dan tags: compression-id
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/lsmInt.h.
334
335
336
337
338
339
340


341
342
343
344
345
346
347

  /* Worker context */
  Snapshot *pWorker;              /* Worker snapshot (or NULL) */
  Freelist *pFreelist;            /* See sortedNewToplevel() */
  int bUseFreelist;               /* True to use pFreelist */
  int bIncrMerge;                 /* True if currently doing a merge */



  /* Debugging message callback */
  void (*xLog)(void *, int, const char *);
  void *pLogCtx;

  /* Work done notification callback */
  void (*xWork)(lsm_db *, void *);
  void *pWorkCtx;







>
>







334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349

  /* Worker context */
  Snapshot *pWorker;              /* Worker snapshot (or NULL) */
  Freelist *pFreelist;            /* See sortedNewToplevel() */
  int bUseFreelist;               /* True to use pFreelist */
  int bIncrMerge;                 /* True if currently doing a merge */

  int bInFactory;                 /* True if within factory.xFactory() */

  /* Debugging message callback */
  void (*xLog)(void *, int, const char *);
  void *pLogCtx;

  /* Work done notification callback */
  void (*xWork)(lsm_db *, void *);
  void *pWorkCtx;
892
893
894
895
896
897
898


899
900
901
902
903
904
905
int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId);

int lsmDbMultiProc(lsm_db *);
void lsmDbDeferredClose(lsm_db *, lsm_file *, LsmFile *);
LsmFile *lsmDbRecycleFd(lsm_db *);

int lsmWalkFreelist(lsm_db *, int, int (*)(void *, int, i64), void *);




/**************************************************************************
** functions in lsm_str.c
*/
void lsmStringInit(LsmString*, lsm_env *pEnv);
int lsmStringExtend(LsmString*, int);







>
>







894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId);

int lsmDbMultiProc(lsm_db *);
void lsmDbDeferredClose(lsm_db *, lsm_file *, LsmFile *);
LsmFile *lsmDbRecycleFd(lsm_db *);

int lsmWalkFreelist(lsm_db *, int, int (*)(void *, int, i64), void *);

int lsmCheckCompressionId(lsm_db *, u32);


/**************************************************************************
** functions in lsm_str.c
*/
void lsmStringInit(LsmString*, lsm_env *pEnv);
int lsmStringExtend(LsmString*, int);
Changes to src/lsm_ckpt.c.
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
      return LSM_PROTOCOL;
    }
  }

  rc = lsmCheckpointDeserialize(pDb, 1, pShm->aSnap1, &pDb->pWorker);
  if( pDb->pWorker ) pDb->pWorker->pDatabase = pDb->pDatabase;

  if( rc==LSM_OK && pDb->pWorker->iCmpId!=pDb->compress.iId ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    rc = LSM_MISMATCH;
    pDb->pWorker = 0;
  }

#if 0
  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
#endif
  return rc;
}







|
<
<
|







922
923
924
925
926
927
928
929


930
931
932
933
934
935
936
937
      return LSM_PROTOCOL;
    }
  }

  rc = lsmCheckpointDeserialize(pDb, 1, pShm->aSnap1, &pDb->pWorker);
  if( pDb->pWorker ) pDb->pWorker->pDatabase = pDb->pDatabase;

  if( rc==LSM_OK ){


    rc = lsmCheckCompressionId(pDb, pDb->pWorker->iCmpId);
  }

#if 0
  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
#endif
  return rc;
}
Changes to src/lsm_main.c.
340
341
342
343
344
345
346
347
348
349
350




351
352
353
354
355
356
357
        pDb->bMultiProc = *piVal = (*piVal!=0);
      }
      break;
    }

    case LSM_CONFIG_SET_COMPRESSION: {
      lsm_compress *p = va_arg(ap, lsm_compress *);
      if( pDb->iReader>=0 ){
        /* May not change compression schemes with an open transaction */
        rc = LSM_MISUSE_BKPT;
      }else{




        if( p->xBound==0 ){
          memset(&pDb->compress, 0, sizeof(lsm_compress));
          pDb->compress.iId = LSM_COMPRESSION_NONE;
        }else{
          memcpy(&pDb->compress, p, sizeof(lsm_compress));
        }
        rc = lsmFsConfigure(pDb);







|



>
>
>
>







340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
        pDb->bMultiProc = *piVal = (*piVal!=0);
      }
      break;
    }

    case LSM_CONFIG_SET_COMPRESSION: {
      lsm_compress *p = va_arg(ap, lsm_compress *);
      if( pDb->iReader>=0 && pDb->bInFactory==0 ){
        /* May not change compression schemes with an open transaction */
        rc = LSM_MISUSE_BKPT;
      }else{
        if( pDb->compress.xFree ){
          /* Invoke any destructor belonging to the current compression. */
          pDb->compress.xFree(pDb->compress.pCtx);
        }
        if( p->xBound==0 ){
          memset(&pDb->compress, 0, sizeof(lsm_compress));
          pDb->compress.iId = LSM_COMPRESSION_NONE;
        }else{
          memcpy(&pDb->compress, p, sizeof(lsm_compress));
        }
        rc = lsmFsConfigure(pDb);
Changes to src/lsm_shared.c.
920
921
922
923
924
925
926














927
928
929
930
931
932
933
/*
** Called when recovery is finished.
*/
int lsmFinishRecovery(lsm_db *pDb){
  lsmTreeEndTransaction(pDb, 1);
  return LSM_OK;
}















/*
** Begin a read transaction. This function is a no-op if the connection
** passed as the only argument already has an open read transaction.
*/
int lsmBeginReadTrans(lsm_db *pDb){
  const int MAX_READLOCK_ATTEMPTS = 10;







>
>
>
>
>
>
>
>
>
>
>
>
>
>







920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
/*
** Called when recovery is finished.
*/
int lsmFinishRecovery(lsm_db *pDb){
  lsmTreeEndTransaction(pDb, 1);
  return LSM_OK;
}

int lsmCheckCompressionId(lsm_db *pDb, u32 iReq){
  if( pDb->compress.iId!=iReq ){
    if( pDb->factory.xFactory ){
      pDb->bInFactory = 1;
      pDb->factory.xFactory(pDb->factory.pCtx, pDb, iReq);
      pDb->bInFactory = 0;
    }
    if( pDb->compress.iId!=iReq ){
      return LSM_MISMATCH;
    }
  }
  return LSM_OK;
}

/*
** Begin a read transaction. This function is a no-op if the connection
** passed as the only argument already has an open read transaction.
*/
int lsmBeginReadTrans(lsm_db *pDb){
  const int MAX_READLOCK_ATTEMPTS = 10;
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
          assert( (rc==LSM_OK)==(pDb->pClient!=0) );
          assert( pDb->iReader>=0 );

          /* Check that the client has the right compression hooks loaded.
          ** If not, set rc to LSM_MISMATCH.  */
          assert( rc!=LSM_OK || pDb->pClient->iCmpId!=LSM_COMPRESSION_EMPTY );
          if( rc==LSM_OK && pDb->pClient->iCmpId!=pDb->compress.iId ){
            rc = LSM_MISMATCH;
          }
        }else{
          rc = lsmReleaseReadlock(pDb);
        }
      }

      if( rc==LSM_BUSY ){







|







993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
          assert( (rc==LSM_OK)==(pDb->pClient!=0) );
          assert( pDb->iReader>=0 );

          /* Check that the client has the right compression hooks loaded.
          ** If not, set rc to LSM_MISMATCH.  */
          assert( rc!=LSM_OK || pDb->pClient->iCmpId!=LSM_COMPRESSION_EMPTY );
          if( rc==LSM_OK && pDb->pClient->iCmpId!=pDb->compress.iId ){
            rc = lsmCheckCompressionId(pDb, pDb->pClient->iCmpId);
          }
        }else{
          rc = lsmReleaseReadlock(pDb);
        }
      }

      if( rc==LSM_BUSY ){
Changes to test/lsm4.test.
43
44
45
46
47
48
49
50
51
52





53
54
55
56
57
  db config {set_compression rle}
  list [catch {db_fetch db 1} msg] $msg
} {1 {error in lsm_csr_open() - 50}}

do_test 1.4 {
  db close
  lsm_open db test.db 
breakpoint
  list [catch {db_fetch db 1} msg] $msg
} {1 {error in lsm_csr_open() - 50}}








finish_test








<


>
>
>
>
>





43
44
45
46
47
48
49

50
51
52
53
54
55
56
57
58
59
60
61
  db config {set_compression rle}
  list [catch {db_fetch db 1} msg] $msg
} {1 {error in lsm_csr_open() - 50}}

do_test 1.4 {
  db close
  lsm_open db test.db 

  list [catch {db_fetch db 1} msg] $msg
} {1 {error in lsm_csr_open() - 50}}

do_test 1.5 {
  db config {set_compression_factory true}
  list [db_fetch db 1] [db_fetch db 2]
} {abc def}



finish_test

Changes to test/test_lsm.c.
411
412
413
414
415
416
417
418

419
420
421
422
423
424
425
  Tcl_ResetResult(interp);
  return TCL_OK;
}

static int testConfigureSetCompression(
  Tcl_Interp *interp, 
  lsm_db *db, 
  Tcl_Obj *pCmp

){
  struct CompressionScheme {
    const char *zName;
    lsm_compress cmp;
  } aCmp[] = {
    { "encrypt", { 0, 43, 
        testCompressEncBound, testCompressEncCompress,







|
>







411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
  Tcl_ResetResult(interp);
  return TCL_OK;
}

static int testConfigureSetCompression(
  Tcl_Interp *interp, 
  lsm_db *db, 
  Tcl_Obj *pCmp,
  unsigned int iId
){
  struct CompressionScheme {
    const char *zName;
    lsm_compress cmp;
  } aCmp[] = {
    { "encrypt", { 0, 43, 
        testCompressEncBound, testCompressEncCompress,
434
435
436
437
438
439
440

441
442
443
444




445



446

447






















448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467

468
469
470
471
472
473
474
        testCompressNoopUncompress, testCompressNoopFree
    } },
    { 0, {0, 0, 0, 0, 0, 0} }
  };
  int iOpt;
  int rc;


  rc = Tcl_GetIndexFromObjStruct(
      interp, pCmp, aCmp, sizeof(aCmp[0]), "scheme", 0, &iOpt
  );
  if( rc!=TCL_OK ) return rc;








  rc = lsm_config(db, LSM_CONFIG_SET_COMPRESSION, &aCmp[iOpt].cmp);
























  return rc;
}

static int testConfigureLsm(Tcl_Interp *interp, lsm_db *db, Tcl_Obj *pObj){
  struct Lsmconfig {
    const char *zOpt;
    int eOpt;
  } aConfig[] = {
    { "autoflush",        LSM_CONFIG_AUTOFLUSH },
    { "page_size",        LSM_CONFIG_PAGE_SIZE },
    { "block_size",       LSM_CONFIG_BLOCK_SIZE },
    { "safety",           LSM_CONFIG_SAFETY },
    { "autowork",         LSM_CONFIG_AUTOWORK },
    { "autocheckpoint",   LSM_CONFIG_AUTOCHECKPOINT },
    { "mmap",             LSM_CONFIG_MMAP },
    { "use_log",          LSM_CONFIG_USE_LOG },
    { "automerge",        LSM_CONFIG_AUTOMERGE },
    { "max_freelist",     LSM_CONFIG_MAX_FREELIST },
    { "multi_proc",       LSM_CONFIG_MULTIPLE_PROCESSES },
    { "set_compression",  LSM_CONFIG_SET_COMPRESSION },

    { 0, 0 }
  };
  int nElem;
  int i;
  Tcl_Obj **apElem;
  int rc;








>
|
|
|
|
>
>
>
>
|
>
>
>

>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>








|
|
|
|
|
|
|
|
|
|
|
|
>







435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
        testCompressNoopUncompress, testCompressNoopFree
    } },
    { 0, {0, 0, 0, 0, 0, 0} }
  };
  int iOpt;
  int rc;

  if( interp ){
    rc = Tcl_GetIndexFromObjStruct(
        interp, pCmp, aCmp, sizeof(aCmp[0]), "scheme", 0, &iOpt
        );
    if( rc!=TCL_OK ) return rc;
  }else{
    int nOpt = sizeof(aCmp)/sizeof(aCmp[0]);
    for(iOpt=0; iOpt<nOpt; iOpt++){
      if( iId==aCmp[iOpt].cmp.iId ) break;
    }
    if( iOpt==nOpt ) return 0;
  }

  rc = lsm_config(db, LSM_CONFIG_SET_COMPRESSION, &aCmp[iOpt].cmp);
  return rc;
}

static int testCompressFactory(void *pCtx, lsm_db *db, unsigned int iId){
  return testConfigureSetCompression(0, db, 0, iId);
}

static int testConfigureSetFactory(
  Tcl_Interp *interp, 
  lsm_db *db, 
  Tcl_Obj *pArg
){
  lsm_compress_factory aFactory[2] = {
    { 0, 0, 0 },
    { 0, testCompressFactory, 0 },
  };
  int bArg = 0;
  int rc;

  rc = Tcl_GetBooleanFromObj(interp, pArg, &bArg);
  if( rc!=TCL_OK ) return rc;
  assert( bArg==1 || bArg==0 );

  rc = lsm_config(db, LSM_CONFIG_SET_COMPRESSION_FACTORY, &aFactory[bArg]);
  return rc;
}

static int testConfigureLsm(Tcl_Interp *interp, lsm_db *db, Tcl_Obj *pObj){
  struct Lsmconfig {
    const char *zOpt;
    int eOpt;
  } aConfig[] = {
    { "autoflush",               LSM_CONFIG_AUTOFLUSH },
    { "page_size",               LSM_CONFIG_PAGE_SIZE },
    { "block_size",              LSM_CONFIG_BLOCK_SIZE },
    { "safety",                  LSM_CONFIG_SAFETY },
    { "autowork",                LSM_CONFIG_AUTOWORK },
    { "autocheckpoint",          LSM_CONFIG_AUTOCHECKPOINT },
    { "mmap",                    LSM_CONFIG_MMAP },
    { "use_log",                 LSM_CONFIG_USE_LOG },
    { "automerge",               LSM_CONFIG_AUTOMERGE },
    { "max_freelist",            LSM_CONFIG_MAX_FREELIST },
    { "multi_proc",              LSM_CONFIG_MULTIPLE_PROCESSES },
    { "set_compression",         LSM_CONFIG_SET_COMPRESSION },
    { "set_compression_factory", LSM_CONFIG_SET_COMPRESSION_FACTORY },
    { 0, 0 }
  };
  int nElem;
  int i;
  Tcl_Obj **apElem;
  int rc;

483
484
485
486
487
488
489
490



491
492
493
494
495
496
497
        Tcl_ResetResult(interp);
        Tcl_AppendResult(interp, "option \"", Tcl_GetString(apElem[i]), 
            "\" requires an argument", 0
            );
        rc = TCL_ERROR;
      }else{
        if( aConfig[iOpt].eOpt==LSM_CONFIG_SET_COMPRESSION ){
          rc = testConfigureSetCompression(interp, db, apElem[i+1]);



        }
        else {
          int iVal;
          rc = Tcl_GetIntFromObj(interp, apElem[i+1], &iVal);
          if( rc==TCL_OK ){
            lsm_config(db, aConfig[iOpt].eOpt, &iVal);
          }







|
>
>
>







516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
        Tcl_ResetResult(interp);
        Tcl_AppendResult(interp, "option \"", Tcl_GetString(apElem[i]), 
            "\" requires an argument", 0
            );
        rc = TCL_ERROR;
      }else{
        if( aConfig[iOpt].eOpt==LSM_CONFIG_SET_COMPRESSION ){
          rc = testConfigureSetCompression(interp, db, apElem[i+1], 0);
        }
        else if( aConfig[iOpt].eOpt==LSM_CONFIG_SET_COMPRESSION_FACTORY ){
          rc = testConfigureSetFactory(interp, db, apElem[i+1]);
        }
        else {
          int iVal;
          rc = Tcl_GetIntFromObj(interp, apElem[i+1], &iVal);
          if( rc==TCL_OK ){
            lsm_config(db, aConfig[iOpt].eOpt, &iVal);
          }