Wednesday, September 9, 2020

[389-commits] [389-ds-base] branch 389-ds-base-1.4.3 updated: Ticket 51190 - SyncRepl plugin provides a wrong cookie

This is an automated email from the git hooks/post-receive script.

tbordaz pushed a commit to branch 389-ds-base-1.4.3
in repository 389-ds-base.

The following commit(s) were added to refs/heads/389-ds-base-1.4.3 by this push:
new 3533589 Ticket 51190 - SyncRepl plugin provides a wrong cookie
3533589 is described below

commit 353358978a121fe61658e684b56c05f292884b91
Author: Thierry Bordaz <tbordaz@redhat.com>
AuthorDate: Fri Jul 10 10:56:21 2020 +0200

Ticket 51190 - SyncRepl plugin provides a wrong cookie

Bug description:
A sync repl thread is similar to persistent search thread.
The server is communicating with the sync repl thread with
an ordered queue of updates.
Updates are written in the queue by post op callbacks.
Sync repl thread waits/reads the queue, retrieve the updates
from the retroCL, checks if target entry matches the
request (scope/filter) and send back the entry/update to
the sync repl client.

Several issues regarding the way order of the updates in
the queue:

(1) When an update generates nested updates (automemeber,
memberof,...) the order of the updates in the queue is
not following the order of applied updates. The consequence
is that the cookie (containing the update nubmer) can be wrong.
It can contains jumps, disorder and invalid number (-1).

When an update fails (nested or primary update), none of the
updates should be pushed to sync_repl queue

(2) The plugin callback on updates are POSTOP, so if there are
two direct updates, there is a possibility that the callback
of the second update (and its nested updates) are enqueued
before the first update. In such case the sync_repl thread
may skip some updates and/or fail to retrieve update from
retroCL (cookie.update_no=-1)

Fix description:
The fix does
(1) implements a pending list of updates (in the thread
private space "get_thread_primary_op").
The first in the pending list is the primary update then
the others are the nested updates.
A new operation (betxn_preop) registers the operation at
the end of the pending list with the state OPERATION_PL_PENDING.
It requires to registers new callbacks (sync_betxn_preop_init)

During be_postop (see below) callbacks flags the pending
updates as OPERATION_PL_SUCCEEDED or OPERATION_PL_FAILED
depending on the operatione result.
When no more pending updates are OPERATION_PL_PENDING,
then depending of the result of the primary update
(OPERATION_PL_SUCCEEDED or OPERATION_PL_FAILED) the
updates are moved to the sync_repl queue.

(2) The postop plugin callbacks are now be_postop

https://pagure.io/389-ds-base/issue/51190

Reviewed by: Mark Reynolds, Simon Pichugin, William Brown (Thanks)

Platforms tested: F31
---
.../tests/suites/plugins/acceptance_test.py | 4 +-
.../tests/suites/syncrepl_plugin/basic_test.py | 553 +++++++++++++++++++++
ldap/servers/plugins/sync/sync.h | 37 ++
ldap/servers/plugins/sync/sync_init.c | 89 +++-
ldap/servers/plugins/sync/sync_persist.c | 296 ++++++++++-
ldap/servers/plugins/sync/sync_util.c | 56 ++-
6 files changed, 1003 insertions(+), 32 deletions(-)

diff --git a/dirsrvtests/tests/suites/plugins/acceptance_test.py b/dirsrvtests/tests/suites/plugins/acceptance_test.py
index e9e830d..9a5da25 100644
--- a/dirsrvtests/tests/suites/plugins/acceptance_test.py
+++ b/dirsrvtests/tests/suites/plugins/acceptance_test.py
@@ -12,6 +12,9 @@ Created on Dec 09, 2014
@author: mreynolds
'''
import logging
+import threading
+from ldap.syncrepl import SyncreplConsumer
+from ldap.ldapobject import ReconnectLDAPObject
import subprocess
import pytest
from lib389.utils import *
@@ -1546,7 +1549,6 @@ def test_referint(topo, args=None):
log.info('test_referint: PASS\n')
return

-
def test_retrocl(topo, args=None):
"""Test Retro Changelog basic functionality

diff --git a/dirsrvtests/tests/suites/syncrepl_plugin/basic_test.py b/dirsrvtests/tests/suites/syncrepl_plugin/basic_test.py
new file mode 100644
index 0000000..dc90944
--- /dev/null
+++ b/dirsrvtests/tests/suites/syncrepl_plugin/basic_test.py
@@ -0,0 +1,553 @@
+# --- BEGIN COPYRIGHT BLOCK ---
+# Copyright (C) 2020 William Brown <william@blackhats.net.au>
+# All rights reserved.
+#
+# License: GPL (version 3 or any later version).
+# See LICENSE for details.
+# --- END COPYRIGHT BLOCK ---
+
+import logging
+import ldap
+import time
+import threading
+from ldap.syncrepl import SyncreplConsumer
+from ldap.ldapobject import ReconnectLDAPObject
+import pytest
+from lib389 import DirSrv
+from lib389.idm.user import nsUserAccounts, UserAccounts
+from lib389.idm.group import Groups
+from lib389.topologies import topology_st as topology
+from lib389.paths import Paths
+from lib389.utils import ds_is_older
+from lib389.plugins import RetroChangelogPlugin, ContentSyncPlugin, AutoMembershipPlugin, MemberOfPlugin, MemberOfSharedConfig, AutoMembershipDefinitions
+from lib389._constants import *
+
+from . import ISyncRepl, syncstate_assert
+
+default_paths = Paths()
+pytestmark = pytest.mark.tier1
+
+log = logging.getLogger(__name__)
+
+def test_syncrepl_basic(topology):
+ """ Test basic functionality of the SyncRepl interface
+
+ :id: f9fea826-8ae2-412a-8e88-b8e0ba939b06
+
+ :setup: Standalone instance
+
+ :steps:
+ 1. Enable Retro Changelog
+ 2. Enable Syncrepl
+ 3. Run the syncstate test to check refresh, add, delete, mod.
+
+ :expectedresults:
+ 1. Success
+ 1. Success
+ 1. Success
+ """
+ st = topology.standalone
+ # Enable RetroChangelog.
+ rcl = RetroChangelogPlugin(st)
+ rcl.enable()
+ # Set the default targetid
+ rcl.replace('nsslapd-attribute', 'nsuniqueid:targetUniqueId')
+ # Enable sync repl
+ csp = ContentSyncPlugin(st)
+ csp.enable()
+ # Restart DS
+ st.restart()
+ # Setup the syncer
+ sync = ISyncRepl(st)
+ # Run the checks
+ syncstate_assert(st, sync)
+
+class TestSyncer(ReconnectLDAPObject, SyncreplConsumer):
+ def __init__(self, *args, **kwargs):
+ self.cookie = None
+ self.cookies = []
+ ldap.ldapobject.ReconnectLDAPObject.__init__(self, *args, **kwargs)
+
+ def syncrepl_set_cookie(self, cookie):
+ # extract the changenumber from the cookie
+ self.cookie = cookie
+ self.cookies.append(cookie.split('#')[2])
+ log.info("XXXX Set cookie: %s" % cookie)
+
+ def syncrepl_get_cookie(self):
+ log.info("XXXX Get cookie: %s" % self.cookie)
+ return self.cookie
+
+ def syncrepl_present(self, uuids, refreshDeletes=False):
+ log.info("XXXX syncrepl_present uuids %s %s" % ( uuids, refreshDeletes))
+
+ def syncrepl_delete(self, uuids):
+ log.info("XXXX syncrepl_delete uuids %s" % uuids)
+
+ def syncrepl_entry(self, dn, attrs, uuid):
+ log.info("XXXX syncrepl_entry dn %s" % dn)
+
+ def syncrepl_refreshdone(self):
+ log.info("XXXX syncrepl_refreshdone")
+
+ def get_cookies(self):
+ return self.cookies
+
+class Sync_persist(threading.Thread, ReconnectLDAPObject, SyncreplConsumer):
+ # This runs a sync_repl client in background
+ # it registers a result that contain a list of the change numbers (from the cookie)
+ # that are list as they are received
+ def __init__(self, inst):
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.inst = inst
+ self.cookie = None
+ self.conn = inst.clone({SER_ROOT_DN: 'cn=directory manager', SER_ROOT_PW: 'password'})
+ self.filterstr = '(|(objectClass=groupofnames)(objectClass=person))'
+ self.attrs = [
+ 'objectclass',
+ 'cn',
+ 'displayname',
+ 'gidnumber',
+ 'givenname',
+ 'homedirectory',
+ 'mail',
+ 'member',
+ 'memberof',
+ 'sn',
+ 'uid',
+ 'uidnumber',
+ ]
+ self.conn.open()
+ self.result = []
+
+ def get_result(self):
+ # used to return the cookies list to the requestor
+ return self.result
+
+ def run(self):
+ """Start a sync repl client"""
+ ldap_connection = TestSyncer(self.inst.toLDAPURL())
+ ldap_connection.simple_bind_s('cn=directory manager', 'password')
+ ldap_search = ldap_connection.syncrepl_search(
+ "dc=example,dc=com",
+ ldap.SCOPE_SUBTREE,
+ mode='refreshAndPersist',
+ attrlist=self.attrs,
+ filterstr=self.filterstr,
+ cookie=None
+ )
+
+ try:
+ while ldap_connection.syncrepl_poll(all=1, msgid=ldap_search):
+ pass
+ except (ldap.SERVER_DOWN, ldap.CONNECT_ERROR) as e:
+ print('syncrepl_poll: LDAP error (%s)', e)
+ self.result = ldap_connection.get_cookies()
+ log.info("ZZZ result = %s" % self.result)
+ self.conn.unbind()
+
+def test_sync_repl_cookie(topology, request):
+ """Test sync_repl cookie are progressing is an increasing order
+ when there are nested updates
+
+ :id: d7fbde25-5702-46ac-b38e-169d7a68e97c
+ :setup: Standalone Instance
+ :steps:
+ 1.: enable retroCL
+ 2.: configure retroCL to log nsuniqueid as targetUniqueId
+ 3.: enable content_sync plugin
+ 4.: enable automember
+ 5.: create (2) groups. Few groups can help to reproduce the concurrent updates problem.
+ 6.: configure automember to provision those groups with 'member'
+ 7.: enable and configure memberof plugin
+ 8.: enable plugin log level
+ 9.: restart the server
+ 10.: create a thread dedicated to run a sync repl client
+ 11.: Create (9) users that will generate nested updates (automember/memberof)
+ 12.: stop sync repl client and collect the list of cookie.change_no
+ 13.: check that cookies.change_no are in increasing order
+ :expectedresults:
+ 1.: succeeds
+ 2.: succeeds
+ 3.: succeeds
+ 4.: succeeds
+ 5.: succeeds
+ 6.: succeeds
+ 7.: succeeds
+ 8.: succeeds
+ 9.: succeeds
+ 10.: succeeds
+ 11.: succeeds
+ 12.: succeeds
+ 13.: succeeds
+ """
+ inst = topology[0]
+
+ # Enable/configure retroCL
+ plugin = RetroChangelogPlugin(inst)
+ plugin.disable()
+ plugin.enable()
+ plugin.set('nsslapd-attribute', 'nsuniqueid:targetuniqueid')
+
+ # Enable sync plugin
+ plugin = ContentSyncPlugin(inst)
+ plugin.enable()
+
+ # Enable automember
+ plugin = AutoMembershipPlugin(inst)
+ plugin.disable()
+ plugin.enable()
+
+ # Add the automember group
+ groups = Groups(inst, DEFAULT_SUFFIX)
+ group = []
+ for i in range(1,3):
+ group.append(groups.create(properties={'cn': 'group%d' % i}))
+
+ # Add the automember config entry
+ am_configs = AutoMembershipDefinitions(inst)
+ for g in group:
+ am_config = am_configs.create(properties={'cn': 'config %s' % g.get_attr_val_utf8('cn'),
+ 'autoMemberScope': DEFAULT_SUFFIX,
+ 'autoMemberFilter': 'uid=*',
+ 'autoMemberDefaultGroup': g.dn,
+ 'autoMemberGroupingAttr': 'member:dn'})
+
+ # Enable and configure memberof plugin
+ plugin = MemberOfPlugin(inst)
+ plugin.disable()
+ plugin.enable()
+
+ plugin.replace_groupattr('member')
+
+ memberof_config = MemberOfSharedConfig(inst, 'cn=memberOf config,{}'.format(DEFAULT_SUFFIX))
+ memberof_config.create(properties={'cn': 'memberOf config',
+ 'memberOfGroupAttr': 'member',
+ 'memberOfAttr': 'memberof'})
+ # Enable plugin log level (usefull for debug)
+ inst.setLogLevel(65536)
+ inst.restart()
+
+ # create a sync repl client and wait 5 seconds to be sure it is running
+ sync_repl = Sync_persist(inst)
+ sync_repl.start()
+ time.sleep(5)
+
+ # create users, that automember/memberof will generate nested updates
+ users = UserAccounts(inst, DEFAULT_SUFFIX)
+ users_set = []
+ for i in range(10001, 10010):
+ users_set.append(users.create_test_user(uid=i))
+
+ # stop the server to get the sync_repl result set (exit from while loop).
+ # Only way I found to acheive that.
+ # and wait a bit to let sync_repl thread time to set its result before fetching it.
+ inst.stop()
+ time.sleep(10)
+ cookies = sync_repl.get_result()
+
+ # checking that the cookie are in increasing and in an acceptable range (0..1000)
+ assert len(cookies) > 0
+ prev = 0
+ for cookie in cookies:
+ log.info('Check cookie %s' % cookie)
+
+ assert int(cookie) > 0
+ assert int(cookie) < 1000
+ assert int(cookie) > prev
+ prev = int(cookie)
+ sync_repl.join()
+ log.info('test_sync_repl_cookie: PASS\n')
+
+ def fin():
+ inst.restart()
+ for user in users_set:
+ try:
+ user.delete()
+ except:
+ pass
+ for g in group:
+ try:
+ g.delete()
+ except:
+ pass
+
+ request.addfinalizer(fin)
+
+ return
+
+def test_sync_repl_cookie_add_del(topology, request):
+ """Test sync_repl cookie are progressing is an increasing order
+ when there add and del
+
+ :id: 83e11038-6ed0-4a5b-ac77-e44887ab11e3
+ :setup: Standalone Instance
+ :steps:
+ 1.: enable retroCL
+ 2.: configure retroCL to log nsuniqueid as targetUniqueId
+ 3.: enable content_sync plugin
+ 4.: enable automember
+ 5.: create (2) groups. Few groups can help to reproduce the concurrent updates problem.
+ 6.: configure automember to provision those groups with 'member'
+ 7.: enable and configure memberof plugin
+ 8.: enable plugin log level
+ 9.: restart the server
+ 10.: create a thread dedicated to run a sync repl client
+ 11.: Create (3) users that will generate nested updates (automember/memberof)
+ 12.: Delete (3) users
+ 13.: stop sync repl client and collect the list of cookie.change_no
+ 14.: check that cookies.change_no are in increasing order
+ :expectedresults:
+ 1.: succeeds
+ 2.: succeeds
+ 3.: succeeds
+ 4.: succeeds
+ 5.: succeeds
+ 6.: succeeds
+ 7.: succeeds
+ 8.: succeeds
+ 9.: succeeds
+ 10.: succeeds
+ 11.: succeeds
+ 12.: succeeds
+ 13.: succeeds
+ 14.: succeeds
+ """
+ inst = topology[0]
+
+ # Enable/configure retroCL
+ plugin = RetroChangelogPlugin(inst)
+ plugin.disable()
+ plugin.enable()
+ plugin.set('nsslapd-attribute', 'nsuniqueid:targetuniqueid')
+
+ # Enable sync plugin
+ plugin = ContentSyncPlugin(inst)
+ plugin.enable()
+
+ # Enable automember
+ plugin = AutoMembershipPlugin(inst)
+ plugin.disable()
+ plugin.enable()
+
+ # Add the automember group
+ groups = Groups(inst, DEFAULT_SUFFIX)
+ group = []
+ for i in range(1,3):
+ group.append(groups.create(properties={'cn': 'group%d' % i}))
+
+ # Add the automember config entry
+ am_configs = AutoMembershipDefinitions(inst)
+ for g in group:
+ am_config = am_configs.create(properties={'cn': 'config %s' % g.get_attr_val_utf8('cn'),
+ 'autoMemberScope': DEFAULT_SUFFIX,
+ 'autoMemberFilter': 'uid=*',
+ 'autoMemberDefaultGroup': g.dn,
+ 'autoMemberGroupingAttr': 'member:dn'})
+
+ # Enable and configure memberof plugin
+ plugin = MemberOfPlugin(inst)
+ plugin.disable()
+ plugin.enable()
+
+ plugin.replace_groupattr('member')
+
+ memberof_config = MemberOfSharedConfig(inst, 'cn=memberOf config,{}'.format(DEFAULT_SUFFIX))
+ memberof_config.create(properties={'cn': 'memberOf config',
+ 'memberOfGroupAttr': 'member',
+ 'memberOfAttr': 'memberof'})
+ # Enable plugin log level (usefull for debug)
+ inst.setLogLevel(65536)
+ inst.restart()
+
+ # create a sync repl client and wait 5 seconds to be sure it is running
+ sync_repl = Sync_persist(inst)
+ sync_repl.start()
+ time.sleep(5)
+
+ # create users, that automember/memberof will generate nested updates
+ users = UserAccounts(inst, DEFAULT_SUFFIX)
+ users_set = []
+ for i in range(10001, 10004):
+ users_set.append(users.create_test_user(uid=i))
+
+ time.sleep(10)
+ # delete users, that automember/memberof will generate nested updates
+ for user in users_set:
+ user.delete()
+ # stop the server to get the sync_repl result set (exit from while loop).
+ # Only way I found to acheive that.
+ # and wait a bit to let sync_repl thread time to set its result before fetching it.
+ inst.stop()
+ cookies = sync_repl.get_result()
+
+ # checking that the cookie are in increasing and in an acceptable range (0..1000)
+ assert len(cookies) > 0
+ prev = 0
+ for cookie in cookies:
+ log.info('Check cookie %s' % cookie)
+
+ assert int(cookie) > 0
+ assert int(cookie) < 1000
+ assert int(cookie) > prev
+ prev = int(cookie)
+ sync_repl.join()
+ log.info('test_sync_repl_cookie_add_del: PASS\n')
+
+ def fin():
+ inst.restart()
+ for g in group:
+ try:
+ g.delete()
+ except:
+ pass
+
+ request.addfinalizer(fin)
+
+ return
+
+def test_sync_repl_cookie_with_failure(topology, request):
+ """Test sync_repl cookie are progressing is the right order
+ when there is a failure in nested updates
+
+ :id: e0103448-170e-4080-8f22-c34606447ce2
+ :setup: Standalone Instance
+ :steps:
+ 1.: enable retroCL
+ 2.: configure retroCL to log nsuniqueid as targetUniqueId
+ 3.: enable content_sync plugin
+ 4.: enable automember
+ 5.: create (4) groups.
+ make group2 groupOfUniqueNames so the automember
+ will fail to add 'member' (uniqueMember expected)
+ 6.: configure automember to provision those groups with 'member'
+ 7.: enable and configure memberof plugin
+ 8.: enable plugin log level
+ 9.: restart the server
+ 10.: create a thread dedicated to run a sync repl client
+ 11.: Create a group that will be the only update received by sync repl client
+ 12.: Create (9) users that will generate nested updates (automember/memberof)
+ 13.: stop sync repl client and collect the list of cookie.change_no
+ 14.: check that the list of cookie.change_no contains only the group 'step 11'
+ :expectedresults:
+ 1.: succeeds
+ 2.: succeeds
+ 3.: succeeds
+ 4.: succeeds
+ 5.: succeeds
+ 6.: succeeds
+ 7.: succeeds
+ 8.: succeeds
+ 9.: succeeds
+ 10.: succeeds
+ 11.: succeeds
+ 12.: Fails (expected)
+ 13.: succeeds
+ 14.: succeeds
+ """
+ inst = topology[0]
+
+ # Enable/configure retroCL
+ plugin = RetroChangelogPlugin(inst)
+ plugin.disable()
+ plugin.enable()
+ plugin.set('nsslapd-attribute', 'nsuniqueid:targetuniqueid')
+
+ # Enable sync plugin
+ plugin = ContentSyncPlugin(inst)
+ plugin.enable()
+
+ # Enable automember
+ plugin = AutoMembershipPlugin(inst)
+ plugin.disable()
+ plugin.enable()
+
+ # Add the automember group
+ groups = Groups(inst, DEFAULT_SUFFIX)
+ group = []
+ for i in range(1,5):
+ group.append(groups.create(properties={'cn': 'group%d' % i}))
+
+ # Set group2 as a groupOfUniqueNames so that automember will fail to update that group
+ # This will trigger a failure in internal MOD and a failure to add member
+ group[1].replace('objectclass', 'groupOfUniqueNames')
+
+ # Add the automember config entry
+ am_configs = AutoMembershipDefinitions(inst)
+ for g in group:
+ am_config = am_configs.create(properties={'cn': 'config %s' % g.get_attr_val_utf8('cn'),
+ 'autoMemberScope': DEFAULT_SUFFIX,
+ 'autoMemberFilter': 'uid=*',
+ 'autoMemberDefaultGroup': g.dn,
+ 'autoMemberGroupingAttr': 'member:dn'})
+
+ # Enable and configure memberof plugin
+ plugin = MemberOfPlugin(inst)
+ plugin.disable()
+ plugin.enable()
+
+ plugin.replace_groupattr('member')
+
+ memberof_config = MemberOfSharedConfig(inst, 'cn=memberOf config,{}'.format(DEFAULT_SUFFIX))
+ memberof_config.create(properties={'cn': 'memberOf config',
+ 'memberOfGroupAttr': 'member',
+ 'memberOfAttr': 'memberof'})
+
+ # Enable plugin log level (usefull for debug)
+ inst.setLogLevel(65536)
+ inst.restart()
+
+ # create a sync repl client and wait 5 seconds to be sure it is running
+ sync_repl = Sync_persist(inst)
+ sync_repl.start()
+ time.sleep(5)
+
+ # Add a test group just to check that sync_repl receives only one update
+ group.append(groups.create(properties={'cn': 'group%d' % 10}))
+
+ # create users, that automember/memberof will generate nested updates
+ users = UserAccounts(inst, DEFAULT_SUFFIX)
+ users_set = []
+ for i in range(1000,1010):
+ try:
+ users_set.append(users.create_test_user(uid=i))
+ # Automember should fail to add uid=1000 in group2
+ assert(False)
+ except ldap.UNWILLING_TO_PERFORM:
+ pass
+
+ # stop the server to get the sync_repl result set (exit from while loop).
+ # Only way I found to acheive that.
+ # and wait a bit to let sync_repl thread time to set its result before fetching it.
+ inst.stop()
+ time.sleep(10)
+ cookies = sync_repl.get_result()
+
+ # checking that the cookie list contains only one entry
+ assert len(cookies) == 1
+ prev = 0
+ for cookie in cookies:
+ log.info('Check cookie %s' % cookie)
+
+ assert int(cookie) > 0
+ assert int(cookie) < 1000
+ assert int(cookie) > prev
+ prev = int(cookie)
+ sync_repl.join()
+ log.info('test_sync_repl_cookie_with_failure: PASS\n')
+
+ def fin():
+ inst.restart()
+ for user in users_set:
+ try:
+ user.delete()
+ except:
+ pass
+ for g in group:
+ try:
+ g.delete()
+ except:
+ pass
+
+ request.addfinalizer(fin)
diff --git a/ldap/servers/plugins/sync/sync.h b/ldap/servers/plugins/sync/sync.h
index a6b54a6..8c367c3 100644
--- a/ldap/servers/plugins/sync/sync.h
+++ b/ldap/servers/plugins/sync/sync.h
@@ -16,6 +16,8 @@

#include <stdio.h>
#include <string.h>
+#include <stdbool.h>
+#include "slap.h"
#include "slapi-plugin.h"
#include "slapi-private.h"

@@ -25,6 +27,8 @@
#define SYNC_PREOP_DESC "content-sync-preop-subplugin"
#define SYNC_POSTOP_DESC "content-sync-postop-subplugin"
#define SYNC_INT_POSTOP_DESC "content-sync-int-postop-subplugin"
+#define SYNC_BETXN_PREOP_DESC "content-sync-betxn-preop-subplugin"
+#define SYNC_BE_POSTOP_DESC "content-sync-be-post-subplugin"

#define OP_FLAG_SYNC_PERSIST 0x01

@@ -64,6 +68,37 @@ typedef struct sync_callback
Sync_UpdateNode *cb_updates;
} Sync_CallBackData;

+/* Pending list flags
+ * OPERATION_PL_PENDING: operation not yet completed
+ * OPERATION_PL_SUCCEEDED: operation completed successfully
+ * OPERATION_PL_FAILED: operation completed and failed
+ * OPERATION_PL_IGNORED: operation completed but with an undefine status
+ */
+typedef enum _pl_flags {
+ OPERATION_PL_PENDING = 1,
+ OPERATION_PL_SUCCEEDED = 2,
+ OPERATION_PL_FAILED = 3,
+ OPERATION_PL_IGNORED = 4
+} pl_flags_t;
+
+/* Pending list operations.
+ * it contains a list ('next') of nested operations. The
+ * order the same order that the server applied the operation
+ * see https://www.port389.org/docs/389ds/design/content-synchronization-plugin.html#queue-and-pending-list
+ */
+typedef struct OPERATION_PL_CTX
+{
+ Operation *op; /* Pending operation, should not be freed as it belongs to the pblock */
+ pl_flags_t flags; /* operation is completed (set to TRUE in POST) */
+ Slapi_Entry *entry; /* entry to be store in the enqueued node. 1st arg sync_queue_change */
+ Slapi_Entry *eprev; /* pre-entry to be stored in the enqueued node. 2nd arg sync_queue_change */
+ ber_int_t chgtype; /* change type to be stored in the enqueued node. 3rd arg of sync_queue_change */
+ struct OPERATION_PL_CTX *next; /* list of nested operation, the head of the list is the primary operation */
+} OPERATION_PL_CTX_T;
+
+OPERATION_PL_CTX_T * get_thread_primary_op(void);
+void set_thread_primary_op(OPERATION_PL_CTX_T *op);
+
int sync_register_operation_extension(void);
int sync_unregister_operation_entension(void);

@@ -75,6 +110,7 @@ int sync_del_persist_post_op(Slapi_PBlock *pb);
int sync_mod_persist_post_op(Slapi_PBlock *pb);
int sync_modrdn_persist_post_op(Slapi_PBlock *pb);
int sync_add_persist_post_op(Slapi_PBlock *pb);
+int sync_update_persist_betxn_pre_op(Slapi_PBlock *pb);

int sync_parse_control_value(struct berval *psbvp, ber_int_t *mode, int *reload, char **cookie);
int sync_create_state_control(Slapi_Entry *e, LDAPControl **ctrlp, int type, Sync_Cookie *cookie);
@@ -181,3 +217,4 @@ typedef struct sync_op_info
Sync_Cookie *cookie; /* cookie to add in control */
PRThread *tid; /* thread for persistent phase */
} SyncOpInfo;
+
diff --git a/ldap/servers/plugins/sync/sync_init.c b/ldap/servers/plugins/sync/sync_init.c
index 2843b88..4b802d6 100644
--- a/ldap/servers/plugins/sync/sync_init.c
+++ b/ldap/servers/plugins/sync/sync_init.c
@@ -14,7 +14,10 @@ static int sync_start(Slapi_PBlock *pb);
static int sync_close(Slapi_PBlock *pb);
static int sync_preop_init(Slapi_PBlock *pb);
static int sync_postop_init(Slapi_PBlock *pb);
-static int sync_internal_postop_init(Slapi_PBlock *pb);
+static int sync_be_postop_init(Slapi_PBlock *pb);
+static int sync_betxn_preop_init(Slapi_PBlock *pb);
+
+static PRUintn thread_primary_op;

int
sync_init(Slapi_PBlock *pb)
@@ -80,17 +83,32 @@ sync_init(Slapi_PBlock *pb)
}

if (rc == 0) {
- char *plugin_type = "internalpostoperation";
+ char *plugin_type = "betxnpreoperation";
/* the config change checking post op */
if (slapi_register_plugin(plugin_type,
1, /* Enabled */
"sync_init", /* this function desc */
- sync_internal_postop_init, /* init func for post op */
- SYNC_INT_POSTOP_DESC, /* plugin desc */
+ sync_betxn_preop_init, /* init func for post op */
+ SYNC_BETXN_PREOP_DESC, /* plugin desc */
NULL,
plugin_identity)) {
slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM,
- "sync_init - Failed to register internal postop plugin\n");
+ "sync_init - Failed to register be_txn_pre_op plugin\n");
+ rc = 1;
+ }
+ }
+ if (rc == 0) {
+ char *plugin_type = "bepostoperation";
+ /* the config change checking post op */
+ if (slapi_register_plugin(plugin_type,
+ 1, /* Enabled */
+ "sync_init", /* this function desc */
+ sync_be_postop_init, /* init func for be_post op */
+ SYNC_BE_POSTOP_DESC, /* plugin desc */
+ NULL,
+ plugin_identity)) {
+ slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM,
+ "sync_init - Failed to register be_txn_pre_op plugin\n");
rc = 1;
}
}
@@ -114,25 +132,31 @@ static int
sync_postop_init(Slapi_PBlock *pb)
{
int rc;
- rc = slapi_pblock_set(pb, SLAPI_PLUGIN_POST_ADD_FN, (void *)sync_add_persist_post_op);
- rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_POST_DELETE_FN, (void *)sync_del_persist_post_op);
- rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_POST_MODIFY_FN, (void *)sync_mod_persist_post_op);
- rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_POST_MODRDN_FN, (void *)sync_modrdn_persist_post_op);
- rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_POST_SEARCH_FN, (void *)sync_srch_refresh_post_search);
+ rc = slapi_pblock_set(pb, SLAPI_PLUGIN_POST_SEARCH_FN, (void *)sync_srch_refresh_post_search);
return (rc);
}

static int
-sync_internal_postop_init(Slapi_PBlock *pb)
+sync_be_postop_init(Slapi_PBlock *pb)
{
int rc;
- rc = slapi_pblock_set(pb, SLAPI_PLUGIN_INTERNAL_POST_ADD_FN, (void *)sync_add_persist_post_op);
- rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_INTERNAL_POST_DELETE_FN, (void *)sync_del_persist_post_op);
- rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_INTERNAL_POST_MODIFY_FN, (void *)sync_mod_persist_post_op);
- rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_INTERNAL_POST_MODRDN_FN, (void *)sync_modrdn_persist_post_op);
+ rc = slapi_pblock_set(pb, SLAPI_PLUGIN_BE_POST_ADD_FN, (void *)sync_add_persist_post_op);
+ rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_BE_POST_DELETE_FN, (void *)sync_del_persist_post_op);
+ rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_BE_POST_MODIFY_FN, (void *)sync_mod_persist_post_op);
+ rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_BE_POST_MODRDN_FN, (void *)sync_modrdn_persist_post_op);
return (rc);
}

+static int
+sync_betxn_preop_init(Slapi_PBlock *pb)
+{
+ int rc;
+ rc = slapi_pblock_set(pb, SLAPI_PLUGIN_BE_TXN_PRE_ADD_FN, (void *)sync_update_persist_betxn_pre_op);
+ rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_BE_TXN_PRE_DELETE_FN, (void *)sync_update_persist_betxn_pre_op);
+ rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_BE_TXN_PRE_MODIFY_FN, (void *)sync_update_persist_betxn_pre_op);
+ rc |= slapi_pblock_set(pb, SLAPI_PLUGIN_BE_TXN_PRE_MODRDN_FN, (void *)sync_update_persist_betxn_pre_op);
+ return (rc);
+}
/*
sync_start
--------------
@@ -156,6 +180,12 @@ sync_start(Slapi_PBlock *pb)
"sync_start - Unable to get arguments\n");
return (-1);
}
+ /* It registers a per thread 'thread_primary_op' variable that is
+ * a list of pending operations. For simple operation, this list
+ * only contains one operation. For nested, the list contains the operations
+ * in the order that they were applied
+ */
+ PR_NewThreadPrivateIndex(&thread_primary_op, NULL);
sync_persist_initialize(argc, argv);

return (0);
@@ -174,3 +204,32 @@ sync_close(Slapi_PBlock *pb __attribute__((unused)))

return (0);
}
+
+/* Return the head of the operations list
+ * the head is the primary operation.
+ * The list is private to that thread and contains
+ * all nested operations applied by the thread.
+ */
+OPERATION_PL_CTX_T *
+get_thread_primary_op(void)
+{
+ OPERATION_PL_CTX_T *prim_op = NULL;
+ if (thread_primary_op) {
+ prim_op = (OPERATION_PL_CTX_T *)PR_GetThreadPrivate(thread_primary_op);
+ }
+
+ return prim_op;
+}
+
+/* It is set with a non NULL op when this is a primary operation
+ * else it set to NULL when the all pending list has be flushed.
+ * The list is flushed when no more operations (in that list) are
+ * pending (OPERATION_PL_PENDING).
+ */
+void
+set_thread_primary_op(OPERATION_PL_CTX_T *op)
+{
+ if (thread_primary_op) {
+ PR_SetThreadPrivate(thread_primary_op, (void *)op);
+ }
+}
\ No newline at end of file
diff --git a/ldap/servers/plugins/sync/sync_persist.c b/ldap/servers/plugins/sync/sync_persist.c
index 880163a..5724d7d 100644
--- a/ldap/servers/plugins/sync/sync_persist.c
+++ b/ldap/servers/plugins/sync/sync_persist.c
@@ -6,6 +6,9 @@
* See LICENSE for details.
* END COPYRIGHT BLOCK **/

+#include <nspr4/prlog.h>
+#include <bits/stdint-intn.h>
+
#include "sync.h"

/* Main list of established persistent synchronizaton searches */
@@ -29,7 +32,7 @@ static PRUint64 thread_count = 0;
static int sync_add_request(SyncRequest *req);
static void sync_remove_request(SyncRequest *req);
static SyncRequest *sync_request_alloc(void);
-void sync_queue_change(Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t chgtype);
+void sync_queue_change(OPERATION_PL_CTX_T *operation);
static void sync_send_results(void *arg);
static void sync_request_wakeup_all(void);
static void sync_node_free(SyncQueueNode **node);
@@ -37,17 +40,244 @@ static void sync_node_free(SyncQueueNode **node);
static int sync_acquire_connection(Slapi_Connection *conn);
static int sync_release_connection(Slapi_PBlock *pb, Slapi_Connection *conn, Slapi_Operation *op, int release);

+/* This routine appends the operation at the end of the
+ * per thread pending list of nested operation..
+ * being a betxn_preop the pending list has the same order
+ * that the server received the operation
+ */
+int
+sync_update_persist_betxn_pre_op(Slapi_PBlock *pb)
+{
+ OPERATION_PL_CTX_T *prim_op;
+ OPERATION_PL_CTX_T *new_op;
+ Slapi_DN *sdn;
+
+ if (!SYNC_IS_INITIALIZED()) {
+ /* not initialized if sync plugin is not started */
+ return 0;
+ }
+
+ /* Create a new pending operation node */
+ new_op = (OPERATION_PL_CTX_T *)slapi_ch_calloc(1, sizeof(OPERATION_PL_CTX_T));
+ new_op->flags = OPERATION_PL_PENDING;
+ slapi_pblock_get(pb, SLAPI_OPERATION, &new_op->op);
+ slapi_pblock_get(pb, SLAPI_TARGET_SDN, &sdn);
+
+ prim_op = get_thread_primary_op();
+ if (prim_op) {
+ /* It already exists a primary operation, so the current
+ * operation is a nested one that we need to register at the end
+ * of the pending nested operations
+ */
+ OPERATION_PL_CTX_T *current_op;
+ for (current_op = prim_op; current_op->next; current_op = current_op->next);
+ current_op->next = new_op;
+ slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_update_persist_betxn_pre_op - nested operation targets "
+ "\"%s\" (0x%lx)\n",
+ slapi_sdn_get_dn(sdn), (ulong) new_op->op);
+ } else {
+ /* The current operation is the first/primary one in the txn
+ * registers it directly in the thread private data (head)
+ */
+ set_thread_primary_op(new_op);
+ slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_update_persist_betxn_pre_op - primary operation targets "
+ "\"%s\" (0x%lx)\n",
+ slapi_sdn_get_dn(sdn), (ulong) new_op->op);
+ }
+ return 0;
+}
+
+/* This operation can not be proceed by sync_repl listener because
+ * of internal problem. For example, POST entry does not exist
+ */
+static void
+ignore_op_pl(Operation *op)
+{
+ OPERATION_PL_CTX_T *prim_op, *curr_op;
+ prim_op = get_thread_primary_op();
+
+ for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {
+ if ((curr_op->op == op) &&
+ (curr_op->flags == OPERATION_PL_PENDING)) { /* If by any "chance" a same operation structure was reused in consecutive updates
+ * we can not only rely on 'op' value
+ */
+ slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "ignore_op_pl operation (0x%lx) from the pending list\n",
+ (ulong) op);
+ curr_op->flags = OPERATION_PL_IGNORED;
+ return;
+ }
+ }
+ slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "ignore_op_pl can not retrieve an operation (0x%lx) in pending list\n",
+ (ulong) op);
+}
+
+/* This is a generic function that is called by betxn_post of this plugin.
+ * For the given operation (pb->pb_op) it sets in the pending list the state
+ * of the completed operation.
+ * When all operations are completed, if the primary operation is successful it
+ * flushes (enqueue) the operations to the sync repl queue(s), else it just free
+ * the pending list (skipping enqueue).
+ */
+static void
+sync_update_persist_op(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t op_tag, char *label)
+{
+ OPERATION_PL_CTX_T *prim_op = NULL, *curr_op;
+ Operation *pb_op;
+ Slapi_DN *sdn;
+ int32_t rc;
+
+ if (!SYNC_IS_INITIALIZED()) {
+ /* not initialized if sync plugin is not started */
+ return;
+ }
+ slapi_pblock_get(pb, SLAPI_OPERATION, &pb_op);
+ slapi_pblock_get(pb, SLAPI_TARGET_SDN, &sdn);
+
+ if (NULL == e) {
+ /* Ignore this operation (for example case of failure of the operation) */
+ ignore_op_pl(pb_op);
+ return;
+ }
+
+ /* Retrieve the result of the operation */
+ if (slapi_op_internal(pb)) {
+ slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc);
+ if (0 != rc) {
+ /* The internal operation did not succeed */
+ slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "internal operation Failed (0x%lx) rc=%d\n",
+ (ulong) pb_op, rc);
+ }
+ } else {
+ slapi_pblock_get(pb, SLAPI_PLUGIN_OPRETURN, &rc);
+ if (0 != rc) {
+ /* The operation did not succeed */
+ slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "direct operation Failed (0x%lx) rc=%d\n",
+ (ulong) pb_op, rc);
+ }
+ }
+
+
+ prim_op = get_thread_primary_op();
+ PR_ASSERT(prim_op);
+ /* First mark the operation as completed/failed
+ * the param to be used once the operation will be pushed
+ * on the listeners queue
+ */
+ for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {
+ if ((curr_op->op == pb_op) &&
+ (curr_op->flags == OPERATION_PL_PENDING)) { /* If by any "chance" a same operation structure was reused in consecutive updates
+ * we can not only rely on 'op' value
+ */
+ if (rc == LDAP_SUCCESS) {
+ curr_op->flags = OPERATION_PL_SUCCEEDED;
+ curr_op->entry = e ? slapi_entry_dup(e) : NULL;
+ curr_op->eprev = eprev ? slapi_entry_dup(eprev) : NULL;
+ curr_op->chgtype = op_tag;
+ } else {
+ curr_op->flags = OPERATION_PL_FAILED;
+ }
+ break;
+ }
+ }
+ if (!curr_op) {
+ slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "%s - operation not found on the pendling list\n", label);
+ PR_ASSERT(curr_op);
+ }
+
+#if DEBUG
+ /* dump the pending queue */
+ for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {
+ char *flags_str;
+ char * entry_str;
+
+ if (curr_op->entry) {
+ entry_str = slapi_entry_get_dn(curr_op->entry);
+ } else if (curr_op->eprev){
+ entry_str = slapi_entry_get_dn(curr_op->eprev);
+ } else {
+ entry_str = "unknown";
+ }
+ switch (curr_op->flags) {
+ case OPERATION_PL_SUCCEEDED:
+ flags_str = "succeeded";
+ break;
+ case OPERATION_PL_FAILED:
+ flags_str = "failed";
+ break;
+ case OPERATION_PL_IGNORED:
+ flags_str = "ignored";
+ break;
+ case OPERATION_PL_PENDING:
+ flags_str = "pending";
+ break;
+ default:
+ flags_str = "unknown";
+ break;
+
+
+ }
+ slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "dump pending list(0x%lx) %s %s\n",
+ (ulong) curr_op->op, entry_str, flags_str);
+ }
+

No comments:

Post a Comment