Tuesday, August 23, 2016

[389-commits] Branch '389-ds-base-1.2.11' - ldap/servers

ldap/servers/plugins/replication/cl5_api.c | 163 ++++++++++------
ldap/servers/plugins/replication/cl5_api.h | 2
ldap/servers/plugins/replication/repl5_replica_config.c | 2
3 files changed, 106 insertions(+), 61 deletions(-)

New commits:
commit 0d5afb2bbd6d639ec59e6bdf075bd61d3d72ef79
Author: Mark Reynolds <mreynolds@redhat.com>
Date: Tue Aug 23 12:06:30 2016 -0400

Ticket 48964 - cleanAllRUV changelog purging incorrectly
processes all backends

Bug Description: When the changelog was being purged of "cleaned" rids it was checking
all the backend changelogs, and not the one from which the
cleanAllRUV task originated from. This could corrupt a different
backend's changelog if both backends used the same RID.

Fix Description: Purge the changelog associated with the backend that is specified in
the cleanAllRUV task. Also moved the "purging" to its own function,
and fixed a few compiler warnings.

https://fedorahosted.org/389/ticket/48965

Reviewed by: nhosoi(Thanks!)

(cherry picked from commit fda00435a7536c1ded72bb78a975f3370d09a3be)

diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c
index 259c054..d91574f 100644
--- a/ldap/servers/plugins/replication/cl5_api.c
+++ b/ldap/servers/plugins/replication/cl5_api.c
@@ -344,10 +344,9 @@ static int _cl5CheckMissingCSN (const CSN *minCsn, const RUV *supplierRUV, CL5DB
static int _cl5TrimInit ();
static void _cl5TrimCleanup ();
static int _cl5TrimMain (void *param);
-static void _cl5DoTrimming (ReplicaId rid);
+static void _cl5DoTrimming ();
static PRBool _cl5CanTrim (time_t time, long *numToTrim);
static void _cl5TrimFile (Object *obj, long *numToTrim);
-
static void _cl5PurgeRID(Object *obj, ReplicaId cleaned_rid);
static int _cl5PurgeGetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key);
static int _cl5PurgeGetNextEntry (CL5Entry *entry, void *iterator, DBT *key);
@@ -3445,43 +3444,37 @@ static int _cl5TrimMain (void *param)
return 0;
}

-/* We remove an entry if it has been replayed to all consumers and
- and the number of entries in the changelog is larger than maxEntries
- or age of the entry is larger than maxAge.
- Also we can't purge entries which correspond to max csns in the
- supplier's ruv. Here is a example where we can get into trouble:
- The server is setup with time based trimming and no consumer's
- At some point all the entries are trimmed from the changelog.
- At a later point a consumer is added and initialized online
- Then a change is made on the supplier.
- To update the consumer, the supplier would attempt to locate
- the last change sent to the consumer in the changelog and will
- fail because the change was removed.
-
+/*
+ * We remove an entry if it has been replayed to all consumers and the number
+ * of entries in the changelog is larger than maxEntries or age of the entry
+ * is larger than maxAge. Also we can't purge entries which correspond to max
+ * csns in the supplier's ruv. Here is a example where we can get into trouble:
+ *
+ * The server is setup with time based trimming and no consumer's
+ * At some point all the entries are trimmed from the changelog.
+ * At a later point a consumer is added and initialized online.
+ * Then a change is made on the supplier.
+ * To update the consumer, the supplier would attempt to locate the last
+ * change sent to the consumer in the changelog and will fail because the
+ * change was removed.
*/
-
-static void _cl5DoTrimming (ReplicaId rid)
+static void _cl5DoTrimming ()
{
Object *obj;
long numToTrim;

PR_Lock (s_cl5Desc.dbTrim.lock);

- /* ONREPL We trim file by file which means that some files will be
- trimmed more often than other. We might have to fix that by, for
- example, randomizing starting point */
+ /*
+ * We are trimming all the changelogs. We trim file by file which
+ * means that some files will be trimmed more often than other. We
+ * might have to fix that by, for example, randomizing the starting
+ * point.
+ */
obj = objset_first_obj (s_cl5Desc.dbFiles);
- while (obj && (_cl5CanTrim ((time_t)0, &numToTrim) || rid))
+ while (obj && _cl5CanTrim ((time_t)0, &numToTrim))
{
- if (rid){
- /*
- * We are cleaning an invalid rid, and need to strip it
- * from the changelog.
- */
- _cl5PurgeRID (obj, rid);
- } else {
- _cl5TrimFile (obj, &numToTrim);
- }
+ _cl5TrimFile (obj, &numToTrim);
obj = objset_next_obj (s_cl5Desc.dbFiles, obj);
}

@@ -3494,6 +3487,43 @@ static void _cl5DoTrimming (ReplicaId rid)
}

/*
+ * We are purging a changelog after a cleanAllRUV task. Find the specific
+ * changelog for the backend that is being cleaned, and purge all the records
+ * with the cleaned rid.
+ */
+static void _cl5DoPurging (Replica *replica)
+{
+ ReplicaId rid = replica_get_rid(replica);
+ const Slapi_DN *sdn = replica_get_root(replica);
+ const char *replName = replica_get_name(replica);
+ char *replGen = replica_get_generation(replica);
+ char *fileName;
+ Object *obj;
+
+ PR_Lock (s_cl5Desc.dbTrim.lock);
+ fileName = _cl5MakeFileName (replName, replGen);
+ obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName);
+ if (obj) {
+ /* We found our changelog, now purge it */
+ _cl5PurgeRID (obj, rid);
+ object_release (obj);
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "Purged rid (%d) from suffix (%s)\n",
+ rid, slapi_sdn_get_dn(sdn));
+ } else {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "Purge rid (%d) failed to find changelog file (%s) for suffix (%s)\n",
+ rid, fileName, slapi_sdn_get_dn(sdn));
+ }
+ PR_Unlock (s_cl5Desc.dbTrim.lock);
+
+ slapi_ch_free_string(&replGen);
+ slapi_ch_free_string(&fileName);
+
+ return;
+}
+
+/*
* If the rid is not set it is the very first iteration of the changelog.
* If the rid is set, we are doing another pass, and we have a key as our
* starting point.
@@ -4005,23 +4035,25 @@ static PRBool _cl5CanTrim (time_t time, long *numToTrim)
{
*numToTrim = 0;

- if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0)
+ if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0) {
return PR_FALSE;
-
+ }
if (s_cl5Desc.dbTrim.maxAge == 0)
{
*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries;
return ( *numToTrim > 0 );
}

- if (s_cl5Desc.dbTrim.maxEntries > 0 &&
- (*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0)
- return PR_TRUE;
+ if (s_cl5Desc.dbTrim.maxEntries > 0 &&
+ (*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0) {
+ return PR_TRUE;
+ }

- if (time)
+ if (time) {
return (current_time () - time > s_cl5Desc.dbTrim.maxAge);
- else
- return PR_TRUE;
+ } else {
+ return PR_TRUE;
+ }
}

static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
@@ -4034,7 +4066,6 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
char *pos;
char *agmt_name;

-
PR_ASSERT (replGen && obj);

file = (CL5DBFile*)object_get_data (obj);
@@ -4042,13 +4073,12 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)

agmt_name = get_thread_private_agmtname();

- if (purge) /* read purge vector entry */
- key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr);
- else /* read upper bound vector */
- key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr);
-
+ if (purge) { /* read purge vector entry */
+ key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr);
+ } else { /* read upper bound vector */
+ key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr);
+ }
key.size = CSN_STRSIZE;
-
data.flags = DB_DBT_MALLOC;

rc = file->db->get(file->db, NULL/*txn*/, &key, &data, 0);
@@ -4058,13 +4088,13 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
rc = _cl5ReadBervals (&vals, &pos, data.size);
slapi_ch_free (&(data.data));
if (rc != CL5_SUCCESS)
- goto done;
+ goto done;

- if (purge)
+ if (purge) {
rc = ruv_init_from_bervals(vals, &file->purgeRUV);
- else
+ } else {
rc = ruv_init_from_bervals(vals, &file->maxRUV);
-
+ }
if (rc != RUV_SUCCESS)
{
slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
@@ -4072,7 +4102,7 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
"RUV error %d\n", agmt_name, purge? "purge" : "upper bound", rc);

rc = CL5_RUV_ERROR;
- goto done;
+ goto done;
}

/* delete the entry; it is re-added when file
@@ -4084,7 +4114,7 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)

case DB_NOTFOUND: /* RUV is lost - need to construct */
rc = _cl5ConstructRUV (replGen, obj, purge);
- goto done;
+ goto done;

default: slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
"%s: _cl5ReadRUV: failed to get purge RUV; "
@@ -6858,12 +6888,14 @@ cl5CleanRUV(ReplicaId rid){
slapi_rwlock_unlock (s_cl5Desc.stLock);
}

-void trigger_cl_purging(ReplicaId rid){
+/*
+ * Create a thread to purge a changelog of cleaned RIDs
+ */
+void trigger_cl_purging(Replica *replica){
PRThread *trim_tid = NULL;

- slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "trigger_cl_purging: rid (%d)\n",(int)rid);
trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void*)trigger_cl_purging_thread,
- (void *)&rid, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
+ (void *)replica, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE);
if (NULL == trim_tid){
slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
@@ -6875,19 +6907,32 @@ void trigger_cl_purging(ReplicaId rid){
}
}

+/*
+ * Purge a changelog of entries that originated from a particular replica(rid)
+ */
void
trigger_cl_purging_thread(void *arg){
- ReplicaId rid = *(ReplicaId *)arg;
+ Replica *replica = (Replica *)arg;

- /* make sure we have a change log, and we aren't closing it */
- if(s_cl5Desc.dbState == CL5_STATE_CLOSED || s_cl5Desc.dbState == CL5_STATE_CLOSING){
+ /* Make sure we have a change log, and we aren't closing it */
+ if (replica == NULL ||
+ s_cl5Desc.dbState == CL5_STATE_CLOSED ||
+ s_cl5Desc.dbState == CL5_STATE_CLOSING) {
return;
}
+
+ /* Bump the changelog thread count */
if (CL5_SUCCESS != _cl5AddThread()) {
slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
- "trigger_cl_purging: failed to increment thread count "
+ "trigger_cl_purging: Abort - failed to increment thread count "
"NSPR error - %d\n", PR_GetError ());
+ return;
}
- _cl5DoTrimming(rid);
+
+ /* Purge the changelog */
+ _cl5DoPurging(replica);
_cl5RemoveThread();
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "trigger_cl_purging: purged changelog for (%s) rid (%d)\n",
+ slapi_sdn_get_dn(replica_get_root(replica)), replica_get_rid(replica));
}
diff --git a/ldap/servers/plugins/replication/cl5_api.h b/ldap/servers/plugins/replication/cl5_api.h
index b46a691..83683f5 100644
--- a/ldap/servers/plugins/replication/cl5_api.h
+++ b/ldap/servers/plugins/replication/cl5_api.h
@@ -493,6 +493,6 @@ int cl5WriteRUV();
int cl5DeleteRUV();
void cl5CleanRUV(ReplicaId rid);
void cl5NotifyCleanup(int rid);
-void trigger_cl_purging(ReplicaId rid);
+void trigger_cl_purging(Replica *replica);

No comments:

Post a Comment