ldap/servers/plugins/replication/cl5_clcache.c | 292 +++++++++++++++----------
ldap/servers/plugins/replication/cl5_clcache.h | 2
3 files changed, 214 insertions(+), 251 deletions(-)
New commits:
commit ec15a75ccdba713e4d74dcd760e3244ba43b6191
Author: Ludwig Krispenz <lkrispen@redhat.com>
Date: Wed Jun 8 11:28:07 2016 +0200
Ticket 48766 - Replication changelog can incorrectly skip over updates
Bug Description:
The changelog iterator uses a buffer to load and send changes, when the buffer is empty
there were scenarios when the straing point for reloading the buffer was incorrectly set
and changes were skipped
Fix Description: reworked clcach buffer code following design at
http://www.port389.org/docs/389ds/design/changelog-processing-in-repl-state-sending-updates.html
https://fedorahosted.org/389/ticket/48766
Reviewed by: Mark and Thierry, thanks
(cherry picked from commit b08df71aa9eb18572f58e55e8d6b9ef7fe181773)
diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c
index ae23353..3adaf86 100644
--- a/ldap/servers/plugins/replication/cl5_api.c
+++ b/ldap/servers/plugins/replication/cl5_api.c
@@ -5489,18 +5489,13 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
{
CLC_Buffer *clcache = NULL;
CL5DBFile *file;
- int i;
- CSN **csns = NULL;
CSN *startCSN = NULL;
- CSN *minCSN = NULL;
char csnStr [CSN_STRSIZE];
int rc = CL5_SUCCESS;
Object *supplierRuvObj = NULL;
RUV *supplierRuv = NULL;
- PRBool newReplica;
PRBool haveChanges = PR_FALSE;
char *agmt_name;
- ReplicaId rid;
PR_ASSERT (consumerRuv && replica && fileObj && iterator);
csnStr[0] = '\0';
@@ -5528,111 +5523,32 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
ruv_dump (supplierRuv, agmt_name, NULL);
}
- /*
- * get the sorted list of SupplierMinCSN (if no ConsumerMaxCSN)
- * and ConsumerMaxCSN for those RIDs where consumer is not
- * up-to-date.
- */
- csns = cl5BuildCSNList (consumerRuv, supplierRuv);
- if (csns == NULL)
- {
- rc = CL5_NOTFOUND;
- goto done;
- }
- /* iterate over elements of consumer's (and/or supplier's) ruv */
- for (i = 0; csns[i]; i++)
- {
- CSN *consumerMaxCSN = NULL;
-
- rid = csn_get_replicaid(csns[i]);
-
- /*
- * Skip CSN that is originated from the consumer.
- * If RID==65535, the CSN is originated from a
- * legacy consumer. In this case the supplier
- * and the consumer may have the same RID.
- */
- if ((rid == consumerRID && rid != MAX_REPLICA_ID) || (is_cleaned_rid(rid)) )
- continue;
+ /* initialize the changelog buffer and do the initial load */
- startCSN = csns[i];
+ rc = clcache_get_buffer ( &clcache, file->db, consumerRID, consumerRuv, supplierRuv );
+ if ( rc != 0 ) goto done;
- rc = clcache_get_buffer ( &clcache, file->db, consumerRID, consumerRuv, supplierRuv );
- if ( rc != 0 ) goto done;
-
- /* This is the first loading of this iteration. For replicas
- * already known to the consumer, we exclude the last entry
- * sent to the consumer by using DB_NEXT. However, for
- * replicas new to the consumer, we include the first change
- * ever generated by that replica.
- */
- newReplica = ruv_get_largest_csn_for_replica (consumerRuv, rid, &consumerMaxCSN);
- csn_free(&consumerMaxCSN);
- rc = clcache_load_buffer (clcache, startCSN, (newReplica ? DB_SET : DB_NEXT));
-
- /* there is a special case which can occur just after migration - in this case,
- the consumer RUV will contain the last state of the supplier before migration,
- but the supplier will have an empty changelog, or the supplier changelog will
- not contain any entries within the consumer min and max CSN - also, since
- the purge RUV contains no CSNs, the changelog has never been purged
- ASSUMPTIONS - it is assumed that the supplier had no pending changes to send
- to any consumers; that is, we can assume that no changes were lost due to
- either changelog purging or database reload - bug# 603061 - richm@netscape.com
- */
- if ((rc == DB_NOTFOUND) && !ruv_has_csns(file->purgeRUV))
- {
- char mincsnStr[CSN_STRSIZE];
-
- /* use the supplier min csn for the buffer start csn - we know
- this csn is in our changelog */
- if ((RUV_SUCCESS == ruv_get_min_csn_ext(supplierRuv, &minCSN, 1 /* ignore cleaned rids */)) &&
- minCSN)
- { /* must now free startCSN */
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
- csn_as_string(startCSN, PR_FALSE, csnStr);
- csn_as_string(minCSN, PR_FALSE, mincsnStr);
- slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "%s: CSN %s not found and no purging, probably a reinit\n",
- agmt_name, csnStr);
- slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "%s: Will try to use supplier min CSN %s to load changelog\n",
- agmt_name, mincsnStr);
- }
- startCSN = minCSN;
- rc = clcache_load_buffer (clcache, startCSN, DB_SET);
- }
- else
- {
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
- csn_as_string(startCSN, PR_FALSE, csnStr);
- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
- "%s: CSN %s not found and no purging, probably a reinit\n",
- agmt_name, csnStr);
- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
- "%s: Could not get the min csn from the supplier RUV\n",
- agmt_name);
- }
- rc = CL5_RUV_ERROR;
- goto done;
- }
- }
+ rc = clcache_load_buffer (clcache, &startCSN);
if (rc == 0) {
- haveChanges = PR_TRUE;
- rc = CL5_SUCCESS;
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
- csn_as_string(startCSN, PR_FALSE, csnStr);
- slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "%s: CSN %s found, position set for replay\n", agmt_name, csnStr);
- }
- if (startCSN != csns[i]) {
- csn_free(&startCSN);
- }
- break;
+ haveChanges = PR_TRUE;
+ rc = CL5_SUCCESS;
+ if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
+ csn_as_string(startCSN, PR_FALSE, csnStr);
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "%s: CSN %s found, position set for replay\n", agmt_name, csnStr);
+ }
}
- else if (rc == DB_NOTFOUND) /* entry not found */
- {
+ else if (rc == DB_NOTFOUND) {
+ /* buffer not loaded.
+ * either because no changes have to be sent ==> startCSN is NULL
+ * or the calculated startCSN cannot be found in the changelog
+ */
+ if (startCSN == NULL) {
+ rc = CL5_NOTFOUND;
+ goto done;
+ }
/* check whether this csn should be present */
rc = _cl5CheckMissingCSN (startCSN, supplierRuv, file);
if (rc == CL5_MISSING_DATA) /* we should have had the change but we don't */
@@ -5650,17 +5566,6 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
"%s: CSN %s not found, we aren't as up to date, or we purged\n",
agmt_name, csnStr);
}
- if (startCSN != csns[i]) {
- csn_free(&startCSN);
- }
- if (rc == CL5_MISSING_DATA) /* we should have had the change but we don't */
- {
- break;
- }
- else /* we are not as up to date or we purged */
- {
- continue;
- }
}
else
{
@@ -5669,34 +5574,29 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
"%s: Failed to retrieve change with CSN %s; db error - %d %s\n",
agmt_name, csnStr, rc, db_strerror(rc));
- if (startCSN != csns[i]) {
- csn_free(&startCSN);
- }
rc = CL5_DB_ERROR;
- break;
- }
+ }
- } /* end for */
/* setup the iterator */
if (haveChanges)
{
- *iterator = (CL5ReplayIterator*) slapi_ch_calloc (1, sizeof (CL5ReplayIterator));
+ *iterator = (CL5ReplayIterator*) slapi_ch_calloc (1, sizeof (CL5ReplayIterator));
- if (*iterator == NULL)
- {
- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ if (*iterator == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
"%s: _cl5PositionCursorForReplay: failed to allocate iterator\n", agmt_name);
- rc = CL5_MEMORY_ERROR;
- goto done;
- }
+ rc = CL5_MEMORY_ERROR;
+ goto done;
+ }
/* ONREPL - should we make a copy of both RUVs here ?*/
- (*iterator)->fileObj = fileObj;
- (*iterator)->clcache = clcache; clcache = NULL;
- (*iterator)->consumerRID = consumerRID;
- (*iterator)->consumerRuv = consumerRuv;
+ (*iterator)->fileObj = fileObj;
+ (*iterator)->clcache = clcache; clcache = NULL;
+ (*iterator)->consumerRID = consumerRID;
+ (*iterator)->consumerRuv = consumerRuv;
(*iterator)->supplierRuvObj = supplierRuvObj;
}
else if (rc == CL5_SUCCESS)
@@ -5706,11 +5606,8 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
}
done:
- if ( clcache )
- clcache_return_buffer ( &clcache );
-
- if (csns)
- cl5DestroyCSNList (&csns);
+ if ( clcache )
+ clcache_return_buffer ( &clcache );
if (rc != CL5_SUCCESS)
{
diff --git a/ldap/servers/plugins/replication/cl5_clcache.c b/ldap/servers/plugins/replication/cl5_clcache.c
index b53d7c0..2d3bb28 100644
--- a/ldap/servers/plugins/replication/cl5_clcache.c
+++ b/ldap/servers/plugins/replication/cl5_clcache.c
@@ -39,6 +39,7 @@
#define DEFAULT_CLC_BUFFER_COUNT_MAX 0
#define DEFAULT_CLC_BUFFER_PAGE_COUNT 32
#define DEFAULT_CLC_BUFFER_PAGE_SIZE 1024
+#define WORK_CLC_BUFFER_PAGE_SIZE 8*DEFAULT_CLC_BUFFER_PAGE_SIZE
enum {
CLC_STATE_READY = 0, /* ready to iterate */
@@ -56,8 +57,9 @@ struct csn_seq_ctrl_block {
ReplicaId rid; /* RID this block serves */
CSN *consumer_maxcsn; /* Don't send CSN <= this */
CSN *local_maxcsn; /* Don't send CSN > this */
- CSN *prev_local_maxcsn; /* */
- int state; /* CLC_STATE_* */
+ CSN *prev_local_maxcsn; /* Copy of last state at buffer loading */
+ CSN *local_mincsn; /* Used to determin anchor csn*/
+ int state; /* CLC_STATE_* */
};
/*
@@ -70,6 +72,8 @@ struct clc_buffer {
ReplicaId buf_consumer_rid; /* help checking threshold csn */
const RUV *buf_consumer_ruv; /* used to skip change */
const RUV *buf_local_ruv; /* used to refresh local_maxcsn */
+ int buf_ignoreConsumerRID; /* how to handle updates from consumer */
+ int buf_load_cnt; /* number of loads for session */
/*
* fields for retriving data from DB
@@ -90,7 +94,6 @@ struct clc_buffer {
int buf_max_cscbs;
/* fields for debugging stat */
- int buf_load_cnt; /* number of loads for session */
int buf_record_cnt; /* number of changes for session */
int buf_record_skipped; /* number of changes skipped */
int buf_skipped_new_rid; /* number of changes skipped due to new_rid */
@@ -133,7 +136,8 @@ struct clc_pool {
static struct clc_pool *_pool = NULL; /* process's buffer pool */
/* static prototypes */
-static int clcache_adjust_anchorcsn ( CLC_Buffer *buf );
+static int clcache_initial_anchorcsn ( CLC_Buffer *buf, int *flag );
+static int clcache_adjust_anchorcsn ( CLC_Buffer *buf, int *flag );
static void clcache_refresh_consumer_maxcsns ( CLC_Buffer *buf );
static int clcache_refresh_local_maxcsns ( CLC_Buffer *buf );
static int clcache_skip_change ( CLC_Buffer *buf );
@@ -251,8 +255,23 @@ clcache_get_buffer ( CLC_Buffer **buf, DB *db, ReplicaId consumer_rid, const RUV
}
if ( NULL != *buf ) {
+ CSN *c_csn = NULL;
+ CSN *l_csn = NULL;
(*buf)->buf_consumer_ruv = consumer_ruv;
(*buf)->buf_local_ruv = local_ruv;
+ (*buf)->buf_load_flag = DB_MULTIPLE_KEY;
+ ruv_get_largest_csn_for_replica (consumer_ruv, consumer_rid, &c_csn);
+ ruv_get_largest_csn_for_replica (local_ruv, consumer_rid, &l_csn);
+ if (l_csn && csn_compare(l_csn, c_csn) > 0) {
+ /* the supplier has updates for the consumer RID and
+ * these updates are newer than on the consumer
+ */
+ (*buf)->buf_ignoreConsumerRID = 0;
+ } else {
+ (*buf)->buf_ignoreConsumerRID = 1;
+ }
+ csn_free(&c_csn);
+ csn_free(&l_csn);
}
else {
slapi_log_error ( SLAPI_LOG_FATAL, get_thread_private_agmtname(),
@@ -305,36 +324,25 @@ clcache_return_buffer ( CLC_Buffer **buf )
* historic reason.
*/
int
-clcache_load_buffer ( CLC_Buffer *buf, CSN *anchorcsn, int flag )
+clcache_load_buffer ( CLC_Buffer *buf, CSN **anchorCSN )
{
int rc = 0;
+ int flag = DB_NEXT;
+ if (anchorCSN) *anchorCSN = NULL;
clcache_refresh_local_maxcsns ( buf );
- /* Set the loading key */
- if ( anchorcsn ) {
+ if (buf->buf_load_cnt == 0 ) {
clcache_refresh_consumer_maxcsns ( buf );
- buf->buf_load_flag = DB_MULTIPLE_KEY;
- csn_as_string ( anchorcsn, 0, (char*)buf->buf_key.data );
- slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
- "session start: anchorcsn=%s\n", (char*)buf->buf_key.data );
- }
- else if ( csn_get_time(buf->buf_current_csn) == 0 ) {
- /* time == 0 means this csn has never been set */
- rc = DB_NOTFOUND;
- }
- else if ( clcache_adjust_anchorcsn ( buf ) != 0 ) {
- rc = DB_NOTFOUND;
- }
- else {
- csn_as_string ( buf->buf_current_csn, 0, (char*)buf->buf_key.data );
- slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
- "load next: anchorcsn=%s\n", (char*)buf->buf_key.data );
+ rc = clcache_initial_anchorcsn ( buf, &flag );
+ } else {
+ rc = clcache_adjust_anchorcsn ( buf, &flag );
}
if ( rc == 0 ) {
buf->buf_state = CLC_STATE_READY;
+ if (anchorCSN) *anchorCSN = buf->buf_current_csn;
rc = clcache_load_buffer_bulk ( buf, flag );
/* Reset some flag variables */
@@ -344,21 +352,15 @@ clcache_load_buffer ( CLC_Buffer *buf, CSN *anchorcsn, int flag )
buf->buf_cscbs[i]->state = CLC_STATE_READY;
}
}
- else if ( anchorcsn ) {
- /* Report error only when the missing is persistent */
- if ( buf->buf_missing_csn && csn_compare (buf->buf_missing_csn, anchorcsn) == 0 ) {
- if (!buf->buf_prev_missing_csn || csn_compare (buf->buf_prev_missing_csn, anchorcsn)) {
- slapi_log_error ( SLAPI_LOG_FATAL, buf->buf_agmt_name,
- "Can't locate CSN %s in the changelog (DB rc=%d). If replication stops, the consumer may need to be reinitialized.\n",
- (char*)buf->buf_key.data, rc );
- csn_dup_or_init_by_csn (&buf->buf_prev_missing_csn, anchorcsn);
- }
- }
- else {
- csn_dup_or_init_by_csn (&buf->buf_missing_csn, anchorcsn);
- }
+ else {
+ slapi_log_error ( SLAPI_LOG_FATAL, buf->buf_agmt_name,
+ "Can't locate CSN %s in the changelog (DB rc=%d). If replication stops, the consumer may need to be reinitialized.\n",
+ (char*)buf->buf_key.data, rc );
}
+ } else if (rc == CLC_STATE_DONE) {
+ rc = DB_NOTFOUND;
}
+
if ( rc != 0 ) {
slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
"clcache_load_buffer: rc=%d\n", rc );
@@ -483,7 +485,7 @@ clcache_get_next_change ( CLC_Buffer *buf, void **key, size_t *keylen, void **da
* We're done with the current buffer. Now load the next chunk.
*/
if ( NULL == *key && CLC_STATE_READY == buf->buf_state ) {
- rc = clcache_load_buffer ( buf, NULL, DB_NEXT );
+ rc = clcache_load_buffer ( buf, NULL );
if ( 0 == rc && buf->buf_record_ptr ) {
DB_MULTIPLE_KEY_NEXT ( buf->buf_record_ptr, &buf->buf_data,
*key, *keylen, *data, *datalen );
@@ -521,7 +523,6 @@ clcache_refresh_consumer_maxcsns ( CLC_Buffer *buf )
int i;
for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
- csn_free(&buf->buf_cscbs[i]->consumer_maxcsn);
ruv_get_largest_csn_for_replica (
buf->buf_consumer_ruv,
buf->buf_cscbs[i]->rid,
@@ -538,14 +539,11 @@ clcache_refresh_local_maxcsn ( const ruv_enum_data *rid_data, void *data )
int i;
rid = csn_get_replicaid ( rid_data->csn );
-
- /*
- * No need to create cscb for consumer's RID.
- * If RID==65535, the CSN is originated from a
- * legacy consumer. In this case the supplier
- * and the consumer may have the same RID.
+ /* we do not handle updates originated at the consumer if not required
+ * and we ignore RID which have been cleaned
*/
- if ( rid == buf->buf_consumer_rid && rid != MAX_REPLICA_ID )
+ if ( (rid == buf->buf_consumer_rid && buf->buf_ignoreConsumerRID) ||
+ is_cleaned_rid(rid) )
return rc;
for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
@@ -564,9 +562,20 @@ clcache_refresh_local_maxcsn ( const ruv_enum_data *rid_data, void *data )
}
buf->buf_cscbs[i]->rid = rid;
buf->buf_num_cscbs++;
+ /* this is the first time we have a local change for the RID
+ * we need to check what the consumer knows about it.
+ */
+ ruv_get_largest_csn_for_replica (
+ buf->buf_consumer_ruv,
+ buf->buf_cscbs[i]->rid,
+ &buf->buf_cscbs[i]->consumer_maxcsn );
}
+ if (buf->buf_cscbs[i]->local_maxcsn)
+ csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->prev_local_maxcsn, buf->buf_cscbs[i]->local_maxcsn );
+
csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->local_maxcsn, rid_data->csn );
+ csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->local_mincsn, rid_data->min_csn );
if ( buf->buf_cscbs[i]->consumer_maxcsn &&
csn_compare (buf->buf_cscbs[i]->consumer_maxcsn, rid_data->csn) >= 0 ) {
@@ -580,88 +589,147 @@ clcache_refresh_local_maxcsn ( const ruv_enum_data *rid_data, void *data )
static int
clcache_refresh_local_maxcsns ( CLC_Buffer *buf )
{
- int i;
- for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
- csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->prev_local_maxcsn,
- buf->buf_cscbs[i]->local_maxcsn );
- }
return ruv_enumerate_elements ( buf->buf_local_ruv, clcache_refresh_local_maxcsn, buf );
}
/*
* Algorithm:
*
- * 1. Snapshot local RUVs;
- * 2. Load buffer;
- * 3. Send to the consumer only those CSNs that are covered
- * by the RUVs snapshot taken in the first step;
- * All CSNs that are covered by the RUVs snapshot taken in the
- * first step are guaranteed in consecutive order for the respected
- * RIDs because of the the CSN pending list control;
- * A CSN that is not covered by the RUVs snapshot may be out of order
- * since it is possible that a smaller CSN might not have committed
- * yet by the time the buffer was loaded.
- * 4. Determine anchorcsn for each RID:
- *
- * Case| Local vs. Buffer | New Local | Next
- * | MaxCSN MaxCSN | MaxCSN | Anchor-CSN
- * ----+-------------------+-----------+----------------
- * 1 | Cl >= Cb | * | Cb
- * 2 | Cl < Cb | Cl | Cb
- * 3 | Cl < Cb | Cl2 | Cl
- *
- * 5. Determine anchorcsn for next load:
+ * 1. Determine anchorcsn for each RID:
+ * 2. Determine anchorcsn for next load:
* Anchor-CSN = min { all Next-Anchor-CSN, Buffer-MaxCSN }
*/
static int
-clcache_adjust_anchorcsn ( CLC_Buffer *buf )
+clcache_initial_anchorcsn ( CLC_Buffer *buf, int *flag )
{
PRBool hasChange = PR_FALSE;
struct csn_seq_ctrl_block *cscb;
int i;
+ CSN *anchorcsn = NULL;
if ( buf->buf_state == CLC_STATE_READY ) {
for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
+ CSN *rid_anchor = NULL;
+ int rid_flag = DB_NEXT;
cscb = buf->buf_cscbs[i];
- if ( cscb->state == CLC_STATE_UP_TO_DATE )
- continue;
+ if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
+ char prevmax[CSN_STRSIZE];
+ char local[CSN_STRSIZE];
+ char curr[CSN_STRSIZE];
+ char conmaxcsn[CSN_STRSIZE];
+ csn_as_string(cscb->prev_local_maxcsn, 0, prevmax);
+ csn_as_string(cscb->local_maxcsn, 0, local);
+ csn_as_string(buf->buf_current_csn, 0, curr);
+ csn_as_string(cscb->consumer_maxcsn, 0, conmaxcsn);
+ slapi_log_error(SLAPI_LOG_REPL, "clcache_initial_anchorcsn" ,
+ "%s - (cscb %d - state %d) - csnPrevMax (%s) "
+ "csnMax (%s) csnBuf (%s) csnConsumerMax (%s)\n",
+ buf->buf_agmt_name, i, cscb->state, prevmax, local,
+ curr, conmaxcsn);
+ }
- /*
- * Case 3 unsafe ruv change: next buffer load should start
- * from where the maxcsn in the old ruv was. Since each
- * cscb has remembered the maxcsn sent to the consumer,
- * CSNs that may be loaded again could easily be skipped.
- */
- if ( cscb->prev_local_maxcsn &&
- csn_compare (cscb->prev_local_maxcsn, buf->buf_current_csn) < 0 &&
- csn_compare (cscb->local_maxcsn, cscb->prev_local_maxcsn) != 0 ) {
+ if (cscb->consumer_maxcsn == NULL) {
+ /* the consumer hasn't seen changes for this RID */
+ rid_anchor = cscb->local_mincsn;
+ rid_flag = DB_SET;
+ } else if ( csn_compare (cscb->local_maxcsn, cscb->consumer_maxcsn) > 0 ) {
+ rid_anchor = cscb->consumer_maxcsn;
+ }
+
+ if (rid_anchor && (anchorcsn == NULL ||
+ ( csn_compare(rid_anchor, anchorcsn) < 0))) {
+ anchorcsn = rid_anchor;
+ *flag = rid_flag;
hasChange = PR_TRUE;
- cscb->state = CLC_STATE_READY;
- csn_init_by_csn ( buf->buf_current_csn, cscb->prev_local_maxcsn );
- csn_as_string ( cscb->prev_local_maxcsn, 0, (char*)buf->buf_key.data );
- slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
- "adjust anchor csn upon %s\n",
- ( cscb->state == CLC_STATE_CSN_GT_RUV ? "out of sequence csn" : "unsafe ruv change") );
- continue;
}
- /*
- * check if there are still changes to send for this RID
- * Assume we had compared the local maxcsn and the consumer
- * max csn before this function was called and hence the
- * cscb->state had been set accordingly.
- */
- if ( hasChange == PR_FALSE &&
- csn_compare (cscb->local_maxcsn, buf->buf_current_csn) > 0 ) {
+
+ }
+ }
+
+ if ( !hasChange ) {
+ buf->buf_state = CLC_STATE_DONE;
+ } else {
+ csn_init_by_csn(buf->buf_current_csn, anchorcsn);
+ csn_as_string(buf->buf_current_csn, 0, (char *)buf->buf_key.data);
+ slapi_log_error(SLAPI_LOG_REPL, "clcache_initial_anchorcsn",
+ "anchor is now: %s\n", (char *)buf->buf_key.data);
+ }
+
+ return buf->buf_state;
+}
+
+static int
+clcache_adjust_anchorcsn ( CLC_Buffer *buf, int *flag )
+{
+ PRBool hasChange = PR_FALSE;
+ struct csn_seq_ctrl_block *cscb;
+ int i;
+ CSN *anchorcsn = NULL;
+
+ if ( buf->buf_state == CLC_STATE_READY ) {
+ for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
+ CSN *rid_anchor = NULL;
+ int rid_flag = DB_NEXT;
+ cscb = buf->buf_cscbs[i];
+
+ if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
+ char prevmax[CSN_STRSIZE];
+ char local[CSN_STRSIZE];
+ char curr[CSN_STRSIZE];
+ char conmaxcsn[CSN_STRSIZE];
+ csn_as_string(cscb->prev_local_maxcsn, 0, prevmax);
+ csn_as_string(cscb->local_maxcsn, 0, local);
+ csn_as_string(buf->buf_current_csn, 0, curr);
+ csn_as_string(cscb->consumer_maxcsn, 0, conmaxcsn);
+ slapi_log_error(SLAPI_LOG_REPL, "clcache_adjust_anchorcsn" ,
+ "%s - (cscb %d - state %d) - csnPrevMax (%s) "
+ "csnMax (%s) csnBuf (%s) csnConsumerMax (%s)\n",
+ buf->buf_agmt_name, i, cscb->state, prevmax, local,
+ curr, conmaxcsn);
+ }
+
+ if (csn_compare (cscb->local_maxcsn, cscb->prev_local_maxcsn) == 0 ||
+ csn_compare (cscb->prev_local_maxcsn, buf->buf_current_csn) > 0 ) {
+ if (csn_compare (cscb->local_maxcsn, cscb->consumer_maxcsn) > 0 ) {
+ rid_anchor = buf->buf_current_csn;
+ }
+ } else {
+ /* prev local max csn < csnBuffer AND different from local maxcsn */
+ if (cscb->prev_local_maxcsn == NULL) {
+ if (cscb->consumer_maxcsn == NULL) {
+ /* the consumer hasn't seen changes for this RID */
+ rid_anchor = cscb->local_mincsn;
+ rid_flag = DB_SET;
+ } else if ( csn_compare (cscb->local_maxcsn, cscb->consumer_maxcsn) > 0 ) {
+ rid_anchor = cscb->consumer_maxcsn;
+ }
+ } else {
+ /* csnPrevMaxSup > 0 */
+ rid_anchor = cscb->consumer_maxcsn;
+ }
+ }
+
+ if (rid_anchor && (anchorcsn == NULL ||
+ ( csn_compare(rid_anchor, anchorcsn) < 0))) {
+ anchorcsn = rid_anchor;
+ *flag = rid_flag;
hasChange = PR_TRUE;
}
+
+
}
}
if ( !hasChange ) {
buf->buf_state = CLC_STATE_DONE;
+ } else {
+ csn_init_by_csn(buf->buf_current_csn, anchorcsn);
+ csn_as_string(buf->buf_current_csn, 0, (char *)buf->buf_key.data);
+ slapi_log_error(SLAPI_LOG_REPL, "clcache_adjust_anchorcsn",
+ "anchor is now: %s\n", (char *)buf->buf_key.data);
}
return buf->buf_state;
@@ -675,7 +743,6 @@ clcache_skip_change ( CLC_Buffer *buf )
int skip = 1;
int i;
char buf_cur_csn_str[CSN_STRSIZE];
- char oth_csn_str[CSN_STRSIZE];
do {
@@ -688,25 +755,14 @@ clcache_skip_change ( CLC_Buffer *buf )
* legacy consumer. In this case the supplier
* and the consumer may have the same RID.
*/
- if (rid == buf->buf_consumer_rid && rid != MAX_REPLICA_ID){
- CSN *cons_maxcsn = NULL;
-
- ruv_get_max_csn(buf->buf_consumer_ruv, &cons_maxcsn);
- if ( csn_compare ( buf->buf_current_csn, cons_maxcsn) > 0 ) {
- /*
- * The consumer must have been "restored" and needs this newer update.
- */
- skip = 0;
- } else if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
+ if (rid == buf->buf_consumer_rid && buf->buf_ignoreConsumerRID){
+ if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
csn_as_string(buf->buf_current_csn, 0, buf_cur_csn_str);
- csn_as_string(cons_maxcsn, 0, oth_csn_str);
slapi_log_error(SLAPI_LOG_REPL, buf->buf_agmt_name,
- "Skipping update because the changelog buffer current csn [%s] is "
- "less than or equal to the consumer max csn [%s]\n",
- buf_cur_csn_str, oth_csn_str);
+ "Skipping update because the consumer with Rid: [%d] is "
+ "ignored\n", rid);
buf->buf_skipped_csn_gt_cons_maxcsn++;
}
- csn_free(&cons_maxcsn);
break;
}
@@ -821,6 +877,7 @@ clcache_free_cscb ( struct csn_seq_ctrl_block ** cscb )
csn_free ( & (*cscb)->consumer_maxcsn );
csn_free ( & (*cscb)->local_maxcsn );
csn_free ( & (*cscb)->prev_local_maxcsn );
+ csn_free ( & (*cscb)->local_mincsn );
slapi_ch_free ( (void **) cscb );
}
@@ -1003,6 +1060,15 @@ clcache_cursor_get ( DBC *cursor, CLC_Buffer *buf, int flag )
{
int rc;
+ if (buf->buf_data.ulen > WORK_CLC_BUFFER_PAGE_SIZE) {
+ /*
+ * The buffer size had to be increased,
+ * reset it to a smaller working size,
+ * if not sufficient it will be increased again
+ */
+ buf->buf_data.ulen = WORK_CLC_BUFFER_PAGE_SIZE;
+ }
+
rc = cursor->c_get ( cursor,
& buf->buf_key,
& buf->buf_data,
diff --git a/ldap/servers/plugins/replication/cl5_clcache.h b/ldap/servers/plugins/replication/cl5_clcache.h
index 4c459ab..75b2817 100644
--- a/ldap/servers/plugins/replication/cl5_clcache.h
+++ b/ldap/servers/plugins/replication/cl5_clcache.h
@@ -23,7 +23,7 @@ typedef struct clc_buffer CLC_Buffer;
int clcache_init ( DB_ENV **dbenv );
void clcache_set_config ();
int clcache_get_buffer ( CLC_Buffer **buf, DB *db, ReplicaId consumer_rid, const RUV *consumer_ruv, const RUV *local_ruv );
-int clcache_load_buffer ( CLC_Buffer *buf, CSN *startCSN, int flag );
+int clcache_load_buffer ( CLC_Buffer *buf, CSN **anchorCSN );
void clcache_return_buffer ( CLC_Buffer **buf );
int clcache_get_next_change ( CLC_Buffer *buf, void **key, size_t *keylen, void **data, size_t *datalen, CSN **csn );
void clcache_destroy ();
--
389-commits mailing list
389-commits@lists.fedoraproject.org
https://lists.fedoraproject.org/admin/lists/389-commits@lists.fedoraproject.org
No comments:
Post a Comment