diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index c6d72f584bcac1ec3c6acd054302c10cfb25e57e..99a00701d9c75ad33d6b0d02a77645f8696e3a24 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -254,6 +254,7 @@ static int ZSTD_isUpdateAuthorized(ZSTD_cParameter param) case ZSTD_p_nbWorkers: case ZSTD_p_jobSize: case ZSTD_p_overlapSizeLog: + case ZSTD_p_rsyncable: case ZSTD_p_enableLongDistanceMatching: case ZSTD_p_ldmHashLog: case ZSTD_p_ldmMinMatch: @@ -315,6 +316,7 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v case ZSTD_p_jobSize: case ZSTD_p_overlapSizeLog: + case ZSTD_p_rsyncable: return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); case ZSTD_p_enableLongDistanceMatching: @@ -441,6 +443,13 @@ size_t ZSTD_CCtxParam_setParameter( return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_overlapSectionLog, value); #endif + case ZSTD_p_rsyncable : +#ifndef ZSTD_MULTITHREAD + return ERROR(parameter_unsupported); +#else + return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_rsyncable, value); +#endif + case ZSTD_p_enableLongDistanceMatching : CCtxParams->ldmParams.enableLdm = (value>0); return CCtxParams->ldmParams.enableLdm; @@ -544,6 +553,13 @@ size_t ZSTD_CCtxParam_getParameter( #else *value = CCtxParams->overlapSizeLog; break; +#endif + case ZSTD_p_rsyncable : +#ifndef ZSTD_MULTITHREAD + return ERROR(parameter_unsupported); +#else + *value = CCtxParams->rsyncable; + break; #endif case ZSTD_p_enableLongDistanceMatching : *value = CCtxParams->ldmParams.enableLdm; @@ -1160,7 +1176,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc, ZSTD_ldm_adjustParameters(¶ms.ldmParams, ¶ms.cParams); assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog); assert(params.ldmParams.hashEveryLog < 32); - zc->ldmState.hashPower = ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength); + zc->ldmState.hashPower = ZSTD_rollingHash_primePower(params.ldmParams.minMatchLength); } { size_t const windowSize = MAX(1, (size_t)MIN(((U64)1 << params.cParams.windowLog), pledgedSrcSize)); diff --git a/lib/compress/zstd_compress_internal.h b/lib/compress/zstd_compress_internal.h index ffbb53a78a9743e1632389a24f3349535c9c6d74..608efd5a0c4a7ec05a4bf3eff0f8bf78dd2b7bda 100644 --- a/lib/compress/zstd_compress_internal.h +++ b/lib/compress/zstd_compress_internal.h @@ -193,6 +193,7 @@ struct ZSTD_CCtx_params_s { unsigned nbWorkers; unsigned jobSize; unsigned overlapSizeLog; + unsigned rsyncable; /* Long distance matching parameters */ ldmParams_t ldmParams; @@ -492,6 +493,64 @@ MEM_STATIC size_t ZSTD_hashPtr(const void* p, U32 hBits, U32 mls) } } +/** ZSTD_ipow() : + * Return base^exponent. + */ +static U64 ZSTD_ipow(U64 base, U64 exponent) +{ + U64 power = 1; + while (exponent) { + if (exponent & 1) power *= base; + exponent >>= 1; + base *= base; + } + return power; +} + +#define ZSTD_ROLL_HASH_CHAR_OFFSET 10 + +/** ZSTD_rollingHash_append() : + * Add the buffer to the hash value. + */ +static U64 ZSTD_rollingHash_append(U64 hash, void const* buf, size_t size) +{ + BYTE const* istart = (BYTE const*)buf; + size_t pos; + for (pos = 0; pos < size; ++pos) { + hash *= prime8bytes; + hash += istart[pos] + ZSTD_ROLL_HASH_CHAR_OFFSET; + } + return hash; +} + +/** ZSTD_rollingHash_compute() : + * Compute the rolling hash value of the buffer. + */ +MEM_STATIC U64 ZSTD_rollingHash_compute(void const* buf, size_t size) +{ + return ZSTD_rollingHash_append(0, buf, size); +} + +/** ZSTD_rollingHash_primePower() : + * Compute the primePower to be passed to ZSTD_rollingHash_rotate() for a hash + * over a window of length bytes. + */ +MEM_STATIC U64 ZSTD_rollingHash_primePower(U32 length) +{ + return ZSTD_ipow(prime8bytes, length - 1); +} + +/** ZSTD_rollingHash_rotate() : + * Rotate the rolling hash by one byte. + */ +MEM_STATIC U64 ZSTD_rollingHash_rotate(U64 hash, BYTE toRemove, BYTE toAdd, U64 primePower) +{ + hash -= (toRemove + ZSTD_ROLL_HASH_CHAR_OFFSET) * primePower; + hash *= prime8bytes; + hash += toAdd + ZSTD_ROLL_HASH_CHAR_OFFSET; + return hash; +} + /*-************************************* * Round buffer management ***************************************/ diff --git a/lib/compress/zstd_ldm.c b/lib/compress/zstd_ldm.c index 6238ddecf24f90e373cbba9c13b9487153e7f878..3f180ddbc5a49aee0e3bfdc045e5627db31d1885 100644 --- a/lib/compress/zstd_ldm.c +++ b/lib/compress/zstd_ldm.c @@ -143,56 +143,6 @@ static void ZSTD_ldm_makeEntryAndInsertByTag(ldmState_t* ldmState, } } -/** ZSTD_ldm_getRollingHash() : - * Get a 64-bit hash using the first len bytes from buf. - * - * Giving bytes s = s_1, s_2, ... s_k, the hash is defined to be - * H(s) = s_1*(a^(k-1)) + s_2*(a^(k-2)) + ... + s_k*(a^0) - * - * where the constant a is defined to be prime8bytes. - * - * The implementation adds an offset to each byte, so - * H(s) = (s_1 + HASH_CHAR_OFFSET)*(a^(k-1)) + ... */ -static U64 ZSTD_ldm_getRollingHash(const BYTE* buf, U32 len) -{ - U64 ret = 0; - U32 i; - for (i = 0; i < len; i++) { - ret *= prime8bytes; - ret += buf[i] + LDM_HASH_CHAR_OFFSET; - } - return ret; -} - -/** ZSTD_ldm_ipow() : - * Return base^exp. */ -static U64 ZSTD_ldm_ipow(U64 base, U64 exp) -{ - U64 ret = 1; - while (exp) { - if (exp & 1) { ret *= base; } - exp >>= 1; - base *= base; - } - return ret; -} - -U64 ZSTD_ldm_getHashPower(U32 minMatchLength) { - DEBUGLOG(4, "ZSTD_ldm_getHashPower: mml=%u", minMatchLength); - assert(minMatchLength >= ZSTD_LDM_MINMATCH_MIN); - return ZSTD_ldm_ipow(prime8bytes, minMatchLength - 1); -} - -/** ZSTD_ldm_updateHash() : - * Updates hash by removing toRemove and adding toAdd. */ -static U64 ZSTD_ldm_updateHash(U64 hash, BYTE toRemove, BYTE toAdd, U64 hashPower) -{ - hash -= ((toRemove + LDM_HASH_CHAR_OFFSET) * hashPower); - hash *= prime8bytes; - hash += toAdd + LDM_HASH_CHAR_OFFSET; - return hash; -} - /** ZSTD_ldm_countBackwardsMatch() : * Returns the number of bytes that match backwards before pIn and pMatch. * @@ -261,9 +211,9 @@ static U64 ZSTD_ldm_fillLdmHashTable(ldmState_t* state, const BYTE* cur = lastHashed + 1; while (cur < iend) { - rollingHash = ZSTD_ldm_updateHash(rollingHash, cur[-1], - cur[ldmParams.minMatchLength-1], - state->hashPower); + rollingHash = ZSTD_rollingHash_rotate(rollingHash, cur[-1], + cur[ldmParams.minMatchLength-1], + state->hashPower); ZSTD_ldm_makeEntryAndInsertByTag(state, rollingHash, hBits, (U32)(cur - base), ldmParams); @@ -324,11 +274,11 @@ static size_t ZSTD_ldm_generateSequences_internal( size_t forwardMatchLength = 0, backwardMatchLength = 0; ldmEntry_t* bestEntry = NULL; if (ip != istart) { - rollingHash = ZSTD_ldm_updateHash(rollingHash, lastHashed[0], - lastHashed[minMatchLength], - hashPower); + rollingHash = ZSTD_rollingHash_rotate(rollingHash, lastHashed[0], + lastHashed[minMatchLength], + hashPower); } else { - rollingHash = ZSTD_ldm_getRollingHash(ip, minMatchLength); + rollingHash = ZSTD_rollingHash_compute(ip, minMatchLength); } lastHashed = ip; diff --git a/lib/compress/zstd_ldm.h b/lib/compress/zstd_ldm.h index 21fba4d591a4e4c2e0892dc8590ae497d38becfb..5310e174d561febf1f529573fbdf3851d6ab8435 100644 --- a/lib/compress/zstd_ldm.h +++ b/lib/compress/zstd_ldm.h @@ -86,10 +86,6 @@ size_t ZSTD_ldm_getTableSize(ldmParams_t params); */ size_t ZSTD_ldm_getMaxNbSeq(ldmParams_t params, size_t maxChunkSize); -/** ZSTD_ldm_getTableSize() : - * Return prime8bytes^(minMatchLength-1) */ -U64 ZSTD_ldm_getHashPower(U32 minMatchLength); - /** ZSTD_ldm_adjustParameters() : * If the params->hashEveryLog is not set, set it to its default value based on * windowLog and params->hashLog. diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index f4aba1d2c494363eba9c83696df80c3b62080976..43afbc1b923c3c6480cdf5e2bdc90d5d075913db 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -471,7 +471,7 @@ static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog); assert(params.ldmParams.hashEveryLog < 32); serialState->ldmState.hashPower = - ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength); + ZSTD_rollingHash_primePower(params.ldmParams.minMatchLength); } else { memset(¶ms.ldmParams, 0, sizeof(params.ldmParams)); } @@ -777,6 +777,14 @@ typedef struct { static const roundBuff_t kNullRoundBuff = {NULL, 0, 0}; +#define RSYNC_LENGTH 32 + +typedef struct { + U64 hash; + U64 hitMask; + U64 primePower; +} rsyncState_t; + struct ZSTDMT_CCtx_s { POOL_ctx* factory; ZSTDMT_jobDescription* jobs; @@ -790,6 +798,7 @@ struct ZSTDMT_CCtx_s { inBuff_t inBuff; roundBuff_t roundBuff; serialState_t serial; + rsyncState_t rsync; unsigned singleBlockingThread; unsigned jobIDMask; unsigned doneJobID; @@ -988,6 +997,9 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, DEBUGLOG(4, "ZSTDMT_p_overlapSectionLog : %u", value); params->overlapSizeLog = (value >= 9) ? 9 : value; return value; + case ZSTDMT_p_rsyncable : + params->rsyncable = (value == 0 ? 0 : 1); + return value; default : return ERROR(parameter_unsupported); } @@ -996,15 +1008,7 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value) { DEBUGLOG(4, "ZSTDMT_setMTCtxParameter"); - switch(parameter) - { - case ZSTDMT_p_jobSize : - return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value); - case ZSTDMT_p_overlapSectionLog : - return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value); - default : - return ERROR(parameter_unsupported); - } + return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value); } size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned* value) @@ -1016,6 +1020,9 @@ size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, case ZSTDMT_p_overlapSectionLog: *value = mtctx->params.overlapSizeLog; break; + case ZSTDMT_p_rsyncable: + *value = mtctx->params.rsyncable; + break; default: return ERROR(parameter_unsupported); } @@ -1381,6 +1388,16 @@ size_t ZSTDMT_initCStream_internal( if (mtctx->targetSectionSize == 0) { mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(params); } + if (params.rsyncable) { + /* Aim for the targetsectionSize as the average job size. */ + U32 const jobSizeMB = (U32)(mtctx->targetSectionSize >> 20); + U32 const rsyncBits = ZSTD_highbit32(jobSizeMB) + 20; + assert(jobSizeMB >= 1); + DEBUGLOG(4, "rsyncLog = %u", rsyncBits); + mtctx->rsync.hash = 0; + mtctx->rsync.hitMask = (1ULL << rsyncBits) - 1; + mtctx->rsync.primePower = ZSTD_rollingHash_primePower(RSYNC_LENGTH); + } if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */ DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize); DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10)); @@ -1818,6 +1835,80 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) return 1; } +typedef struct { + size_t toLoad; /* The number of bytes to load from the input. */ + int flush; /* Boolean declaring if we must flush because we found a synchronization point. */ +} syncPoint_t; + +/** + * Searches through the input for a synchronization point. If one is found, we + * will instruct the caller to flush, and return the number of bytes to load. + * Otherwise, we will load as many bytes as possible and instruct the caller + * to continue as normal. + */ +static syncPoint_t findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input) { + BYTE const* const istart = (BYTE const*)input.src + input.pos; + U64 const primePower = mtctx->rsync.primePower; + U64 const hitMask = mtctx->rsync.hitMask; + + syncPoint_t syncPoint; + U64 hash; + BYTE const* prev; + size_t pos; + + syncPoint.toLoad = MIN(input.size - input.pos, mtctx->targetSectionSize - mtctx->inBuff.filled); + syncPoint.flush = 0; + if (!mtctx->params.rsyncable) + /* Rsync is disabled. */ + return syncPoint; + if (mtctx->inBuff.filled + syncPoint.toLoad < RSYNC_LENGTH) + /* Not enough to compute the hash. + * We will miss any synchronization points in this RSYNC_LENGTH byte + * window. However, since it depends only in the internal buffers, if the + * state is already synchronized, we will remain synchronized. + * Additionally, the probability that we miss a synchronization point is + * low: RSYNC_LENGTH / targetSectionSize. + */ + return syncPoint; + /* Initialize the loop variables. */ + if (mtctx->inBuff.filled >= RSYNC_LENGTH) { + /* We have enough bytes buffered to initialize the hash. + * Start scanning at the beginning of the input. + */ + pos = 0; + prev = (BYTE const*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled - RSYNC_LENGTH; + hash = ZSTD_rollingHash_compute(prev, RSYNC_LENGTH); + } else { + /* We don't have enough bytes buffered to initialize the hash, but + * we know we have at least RSYNC_LENGTH bytes total. + * Start scanning after the first RSYNC_LENGTH bytes less the bytes + * already buffered. + */ + pos = RSYNC_LENGTH - mtctx->inBuff.filled; + prev = (BYTE const*)mtctx->inBuff.buffer.start - pos; + hash = ZSTD_rollingHash_compute(mtctx->inBuff.buffer.start, mtctx->inBuff.filled); + hash = ZSTD_rollingHash_append(hash, istart, pos); + } + /* Starting with the hash of the previous RSYNC_LENGTH bytes, roll + * through the input. If we hit a synchronization point, then cut the + * job off, and tell the compressor to flush the job. Otherwise, load + * all the bytes and continue as normal. + * If we go too long without a synchronization point (targetSectionSize) + * then a block will be emitted anyways, but this is okay, since if we + * are already synchronized we will remain synchronized. + */ + for (; pos < syncPoint.toLoad; ++pos) { + BYTE const toRemove = pos < RSYNC_LENGTH ? prev[pos] : istart[pos - RSYNC_LENGTH]; + /* if (pos >= RSYNC_LENGTH) assert(ZSTD_rollingHash_compute(istart + pos - RSYNC_LENGTH, RSYNC_LENGTH) == hash); */ + hash = ZSTD_rollingHash_rotate(hash, toRemove, istart[pos], primePower); + if ((hash & hitMask) == hitMask) { + syncPoint.toLoad = pos + 1; + syncPoint.flush = 1; + break; + } + } + return syncPoint; +} /** ZSTDMT_compressStream_generic() : * internal use only - exposed to be invoked from zstd_compress.c @@ -1844,7 +1935,8 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, } /* single-pass shortcut (note : synchronous-mode) */ - if ( (mtctx->nextJobID == 0) /* just started */ + if ( (!mtctx->params.rsyncable) /* rsyncable mode is disabled */ + && (mtctx->nextJobID == 0) /* just started */ && (mtctx->inBuff.filled == 0) /* nothing buffered */ && (!mtctx->jobReady) /* no job already created */ && (endOp == ZSTD_e_end) /* end order */ @@ -1876,14 +1968,17 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start); } if (mtctx->inBuff.buffer.start != NULL) { - size_t const toLoad = MIN(input->size - input->pos, mtctx->targetSectionSize - mtctx->inBuff.filled); + syncPoint_t const syncPoint = findSynchronizationPoint(mtctx, *input); + if (syncPoint.flush && endOp == ZSTD_e_continue) { + endOp = ZSTD_e_flush; + } assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize); DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u", - (U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize); - memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad); - input->pos += toLoad; - mtctx->inBuff.filled += toLoad; - forwardInputProgress = toLoad>0; + (U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize); + memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, syncPoint.toLoad); + input->pos += syncPoint.toLoad; + mtctx->inBuff.filled += syncPoint.toLoad; + forwardInputProgress = syncPoint.toLoad>0; } if ((input->pos < input->size) && (endOp == ZSTD_e_end)) endOp = ZSTD_e_flush; /* can't end now : not all input consumed */ diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 12ad9f899b57648b1466d17eb2494829463ef8e8..b6bcb9e5d4fc915196777157fcf712745bbdf86f 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -85,7 +85,8 @@ ZSTDLIB_API size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx, * List of parameters that can be set using ZSTDMT_setMTCtxParameter() */ typedef enum { ZSTDMT_p_jobSize, /* Each job is compressed in parallel. By default, this value is dynamically determined depending on compression parameters. Can be set explicitly here. */ - ZSTDMT_p_overlapSectionLog /* Each job may reload a part of previous job to enhance compressionr ratio; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window. This is a "sticky" parameter : its value will be re-used on next compression job */ + ZSTDMT_p_overlapSectionLog, /* Each job may reload a part of previous job to enhance compressionr ratio; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window. This is a "sticky" parameter : its value will be re-used on next compression job */ + ZSTDMT_p_rsyncable /* Enables rsyncable mode. */ } ZSTDMT_parameter; /* ZSTDMT_setMTCtxParameter() : diff --git a/lib/zstd.h b/lib/zstd.h index c7e9215da50991648f4151032ff36f4ab0682653..6eb2dd83502a275fcf4f6e401735d1f9a5d48509 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -1149,6 +1149,27 @@ typedef enum { * enum. See the comments on that enum for an * explanation of the feature. */ + ZSTD_p_rsyncable, /* Enables rsyncable mode, which makes compressed + * files more rsync friendly by adding periodic + * synchronization points to the compressed data. + * The target average block size is + * ZSTD_p_jobSize / 2. You can modify the job size + * to increase or decrease the granularity of the + * synchronization point. Once the jobSize is + * smaller than the window size, you will start to + * see degraded compression ratio. + * NOTE: This only works when multithreading is + * enabled. + * NOTE: You probably don't want to use this with + * long range mode, since that will decrease the + * effectiveness of the synchronization points, + * but your milage may vary. + * NOTE: Rsyncable mode will limit the maximum + * compression speed to approximately 400 MB/s. + * If your compression level is already running + * significantly slower than that (< 200 MB/s), + * the speed won't be significantly impacted. + */ } ZSTD_cParameter; diff --git a/programs/fileio.c b/programs/fileio.c index c24f4defbb9ad93a9c262c6abaa059f12264a339..2818b96e81871bae3bae58124aec9ab522fac924 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -307,6 +307,12 @@ void FIO_setAdaptiveMode(unsigned adapt) { EXM_THROW(1, "Adaptive mode is not compatible with single thread mode \n"); g_adaptiveMode = adapt; } +static U32 g_rsyncable = 0; +void FIO_setRsyncable(unsigned rsyncable) { + if ((rsyncable>0) && (g_nbWorkers==0)) + EXM_THROW(1, "Rsyncable mode is not compatible with single thread mode \n"); + g_rsyncable = rsyncable; +} static int g_minAdaptLevel = -50; /* initializing this value requires a constant, so ZSTD_minCLevel() doesn't work */ void FIO_setAdaptMin(int minCLevel) { @@ -550,6 +556,7 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, #ifdef ZSTD_MULTITHREAD DISPLAYLEVEL(5,"set nb workers = %u \n", g_nbWorkers); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbWorkers, g_nbWorkers) ); + CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_jobSize, g_blockSize) ); if ( (g_overlapLog == FIO_OVERLAP_LOG_NOTSET) && (cLevel == ZSTD_maxCLevel()) ) g_overlapLog = 9; /* full overlap */ @@ -557,6 +564,7 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, DISPLAYLEVEL(3,"set overlapLog = %u \n", g_overlapLog); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_overlapSizeLog, g_overlapLog) ); } + CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_rsyncable, g_rsyncable) ); #endif /* dictionary */ CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* set the value temporarily for dictionary loading, to adapt compression parameters */ diff --git a/programs/fileio.h b/programs/fileio.h index 4c7049cb7167fd196e699bdfabf535c1e6541fdf..8edb7dfe8293e4c9549b14e6805b3129bcad7f1f 100644 --- a/programs/fileio.h +++ b/programs/fileio.h @@ -65,6 +65,7 @@ void FIO_setNotificationLevel(unsigned level); void FIO_setOverlapLog(unsigned overlapLog); void FIO_setRemoveSrcFile(unsigned flag); void FIO_setSparseWrite(unsigned sparse); /**< 0: no sparse; 1: disable on stdout; 2: always enabled */ +void FIO_setRsyncable(unsigned rsyncable); /*-************************************* diff --git a/programs/zstd.1.md b/programs/zstd.1.md index c0c04698ddc2ce5c972c7a033e88c0e97b477dcf..4920ac018c8645bd1aece8e6ceb0acba1fb1e0a4 100644 --- a/programs/zstd.1.md +++ b/programs/zstd.1.md @@ -144,6 +144,14 @@ the last one takes effect. Due to the chaotic nature of dynamic adaptation, compressed result is not reproducible. _note_ : at the time of this writing, `--adapt` can remain stuck at low speed when combined with multiple worker threads (>=2). +* `--rsyncable` : + `zstd` will periodically synchronize the compression state to make the + compressed file more rsync-friendly. There is a negligible impact to + compression ratio, and the faster compression levels will see a small + compression speed hit. + This feature does not work with `--single-thread`. You probably don't want + to use it with long range mode, since it will decrease the effectiveness of + the synchronization points, but your milage may vary. * `-D file`: use `file` as Dictionary to compress or decompress FILE(s) * `--no-dictID`: diff --git a/programs/zstdcli.c b/programs/zstdcli.c index 153de961d96bc6e44aca341df20bcfa04f92bb77..9f908355f7f6bbff5bd9efe6b44ab0a44b79511f 100644 --- a/programs/zstdcli.c +++ b/programs/zstdcli.c @@ -143,6 +143,7 @@ static int usage_advanced(const char* programName) #ifdef ZSTD_MULTITHREAD DISPLAY( " -T# : spawns # compression threads (default: 1, 0==# cores) \n"); DISPLAY( " -B# : select size of each job (default: 0==automatic) \n"); + DISPLAY( " --rsyncable : compress using a rsync-friendly method (-B sets block size) \n"); #endif DISPLAY( "--no-dictID : don't write dictID into header (dictionary compression)\n"); DISPLAY( "--[no-]check : integrity check (default: enabled) \n"); @@ -475,6 +476,7 @@ int main(int argCount, const char* argv[]) adapt = 0, adaptMin = MINCLEVEL, adaptMax = MAXCLEVEL, + rsyncable = 0, nextArgumentIsOutFileName = 0, nextArgumentIsMaxDict = 0, nextArgumentIsDictID = 0, @@ -607,6 +609,7 @@ int main(int argCount, const char* argv[]) #ifdef ZSTD_LZ4COMPRESS if (!strcmp(argument, "--format=lz4")) { suffix = LZ4_EXTENSION; FIO_setCompressionType(FIO_lz4Compression); continue; } #endif + if (!strcmp(argument, "--rsyncable")) { rsyncable = 1; continue; } /* long commands with arguments */ #ifndef ZSTD_NODICT @@ -1052,6 +1055,7 @@ int main(int argCount, const char* argv[]) FIO_setAdaptiveMode(adapt); FIO_setAdaptMin(adaptMin); FIO_setAdaptMax(adaptMax); + FIO_setRsyncable(rsyncable); if (adaptMin > cLevel) cLevel = adaptMin; if (adaptMax < cLevel) cLevel = adaptMax; @@ -1060,7 +1064,7 @@ int main(int argCount, const char* argv[]) else operationResult = FIO_compressMultipleFilenames(filenameTable, filenameIdx, outFileName, suffix, dictFileName, cLevel, compressionParams); #else - (void)suffix; (void)adapt; (void)ultra; (void)cLevel; (void)ldmFlag; /* not used when ZSTD_NOCOMPRESS set */ + (void)suffix; (void)adapt; (void)rsyncable; (void)ultra; (void)cLevel; (void)ldmFlag; /* not used when ZSTD_NOCOMPRESS set */ DISPLAY("Compression not supported \n"); #endif } else { /* decompression or test */ diff --git a/tests/playTests.sh b/tests/playTests.sh index f35f5fee6ef5b38ceb12fc89b79c3b957e0bcc0d..99609a5ea01a10c0e8a5cc7050c1b1f667c9d541 100755 --- a/tests/playTests.sh +++ b/tests/playTests.sh @@ -836,6 +836,12 @@ $ECHO "===> test: --adapt must fail on incoherent bounds " ./datagen > tmp $ZSTD -f -vv --adapt=min=10,max=9 tmp && die "--adapt must fail on incoherent bounds" +$ECHO "\n===> rsyncable mode " +roundTripTest -g10M " --rsyncable" +roundTripTest -g10M " --rsyncable -B100K" +$ECHO "===> test: --rsyncable must fail with --single-thread" +$ZSTD -f -vv --rsyncable --single-thread tmp && die "--rsyncable must fail with --single-thread" + if [ "$1" != "--test-large-data" ]; then $ECHO "Skipping large data tests" diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index f47451a3c3d501b143c8ad90ab0a8ed3f3e9978f..2e076d7b752d6baa424a579cfb9e2ce6f6042d1f 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -1915,6 +1915,8 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_jobSize, (U32)FUZ_rLogLength(&lseed, jobLog), opaqueAPI) ); } } + /* Enable rsyncable mode 1 in 4 times. */ + setCCtxParameter(zc, cctxParams, ZSTD_p_rsyncable, (FUZ_rand(&lseed) % 4 == 0), opaqueAPI); if (FUZ_rand(&lseed) & 1) CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_forceMaxWindow, FUZ_rand(&lseed) & 1, opaqueAPI) );