Logo Search packages:      
Sourcecode: zope-ldapmultiplugins version File versions  Download package

LDAPMultiPlugin.py

##############################################################################
#
# LDAPMultiPlugin   Shim to use the LDAPUserFolder with the
#                   PluggableAuthenticationService
#
# This software is governed by a license. See
# LICENSE.txt for the terms of this license.
#
##############################################################################

__doc__     = """ LDAPUserFolder shim module """
__version__ = '$Revision: 1361 $'[11:-2]

# General Python imports
import copy
import logging
import os
import operator
from urllib import quote_plus

# Zope imports
from Acquisition import aq_base
from Globals import InitializeClass
from Globals import DTMLFile
from Globals import package_home
from AccessControl import ClassSecurityInfo
from Products.LDAPUserFolder import manage_addLDAPUserFolder

from Products.PluggableAuthService.interfaces.plugins import \
     IUserEnumerationPlugin, IGroupsPlugin, IGroupEnumerationPlugin, \
     IRoleEnumerationPlugin
from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.utils import implementedBy

from LDAPPluginBase import LDAPPluginBase

logger = logging.getLogger('event.LDAPMultiPlugin')
_dtmldir = os.path.join(package_home(globals()), 'dtml')
addLDAPMultiPluginForm = DTMLFile('addLDAPMultiPlugin', _dtmldir)

def manage_addLDAPMultiPlugin( self, id, title, LDAP_server, login_attr
                             , uid_attr, users_base, users_scope, roles
                             , groups_base, groups_scope, binduid, bindpwd
                             , binduid_usage=1, rdn_attr='cn', local_groups=0
                             , use_ssl=0 , encryption='SHA', read_only=0
                             , REQUEST=None
                             ):
    """ Factory method to instantiate a LDAPMultiPlugin """
    # Make sure we really are working in our container (the 
    # PluggableAuthService object)
    self = self.this()

    # Value needs massaging, there's some magic transcending a simple true
    # or false expeced by the LDAP delegate :(
    if use_ssl:
        use_ssl = 1
    else:
        use_ssl = 0

    # Instantiate the folderish adapter object
    lmp = LDAPMultiPlugin(id, title=title)
    self._setObject(id, lmp)
    lmp = getattr(aq_base(self), id)
    lmp_base = aq_base(lmp)

    # Put the "real" LDAPUserFolder inside it
    manage_addLDAPUserFolder(lmp)
    luf = getattr(lmp_base, 'acl_users')
    
    host_elems = LDAP_server.split(':')
    host = host_elems[0]
    if len(host_elems) > 1:
        port = host_elems[1]
    else:
        if use_ssl:
            port = '636'
        else:
            port = '389'
    
    luf.manage_addServer(host, port=port, use_ssl=use_ssl)
    luf.manage_edit( title
                   , login_attr
                   , uid_attr
                   , users_base
                   , users_scope
                   , roles
                   , groups_base
                   , groups_scope
                   , binduid
                   , bindpwd
                   , binduid_usage=binduid_usage
                   , rdn_attr=rdn_attr
                   , local_groups=local_groups
                   , encryption=encryption
                   , read_only=read_only
                   , REQUEST=None
                   )

    # clean out the __allow_groups__ bit because it is not needed here
    # and potentially harmful
    lmp_base = aq_base(lmp)
    if hasattr(lmp_base, '__allow_groups__'):
        del lmp_base.__allow_groups__

    if REQUEST is not None:
        REQUEST.RESPONSE.redirect('%s/manage_main' % self.absolute_url())



00110 class LDAPMultiPlugin(LDAPPluginBase):
    """ The adapter that mediates between the PAS and the LDAPUserFolder """
    security = ClassSecurityInfo()
    meta_type = 'LDAP Multi Plugin'

    security.declarePrivate('getGroupsForPrincipal')
00116     def getGroupsForPrincipal(self, user, request=None, attr=None):
        """ Fulfill GroupsPlugin requirements """
        acl = self._getLDAPUserFolder()

        if acl is None:
            return ()

        unmangled_userid = self._demangle(user.getId())
        if unmangled_userid is None:
            return ()

        ldap_user = acl.getUserById(unmangled_userid)

        if ldap_user is None:
            return ()

        groups = acl.getGroups(ldap_user.getUserDN(), attr=attr)

        return tuple([x[0] for x in groups])


    security.declarePrivate('enumerateUsers')
00138     def enumerateUsers( self
                      , id=None
                      , login=None
                      , exact_match=0
                      , sort_by=None
                      , max_results=None
                      , **kw
                      ):
        """ Fulfill the UserEnumerationPlugin requirements """
        view_name = self.getId() + '_enumerateUsers'
        criteria = {'id':id, 'login':login, 'exact_match':exact_match,
                    'sort_by':sort_by, 'max_results':max_results}
        criteria.update(kw)

        cached_info = self.ZCacheable_get(view_name = view_name,
                                          keywords = criteria,
                                          default = None)

        if cached_info is not None:
            logger.debug('returning cached results from enumerateUsers')
            return cached_info

        result = []
        acl = self._getLDAPUserFolder()
        login_attr = acl.getProperty('_login_attr')
        uid_attr = acl.getProperty('_uid_attr')
        rdn_attr = acl.getProperty('_rdnattr')
        plugin_id = self.getId()
        edit_url = '%s/%s/manage_userrecords' % (plugin_id, acl.getId())

        if acl is None:
            return ()

        if exact_match and (id or login):
            if id:
                ldap_user = acl.getUserById(id)
            elif login:
                ldap_user = acl.getUser(login)

            if ldap_user is not None:
                qs = 'user_dn=%s' % quote_plus(ldap_user.getUserDN())
                result.append( { 'id' : ldap_user.getId()
                               , 'login' : ldap_user.getProperty(login_attr)
                               , 'pluginid' : plugin_id
                               , 'editurl' : '%s?%s' % (edit_url, qs)
                               } ) 
        else:
            l_results = []
            seen = []
            criteria = {}

            if id:
                if uid_attr == 'dn':
                    # Workaround: Due to the way findUser reacts when a DN
                    # is searched for I need to hack around it... This 
                    # limits the usefulness of searching by ID if the user
                    # folder uses the full DN aas user ID.
                    criteria[rdn_attr] = id
                else:
                    criteria[uid_attr] = id

            if login:
                criteria[login_attr] = login

            for key, val in kw.items():
                if key not in (login_attr, uid_attr):
                    criteria[key] = val

            l_results = acl.searchUsers(**criteria)

            for l_res in l_results:
                if l_res['dn'] not in seen:
                    l_res['id'] = l_res[uid_attr]
                    l_res['login'] = l_res[login_attr]
                    l_res['pluginid'] = plugin_id
                    quoted_dn = quote_plus(l_res['dn'])
                    l_res['editurl'] = '%s?user_dn=%s' % (edit_url, quoted_dn)
                    result.append(l_res)
                    seen.append(l_res['dn'])

            if sort_by is not None:
                result.sort(lambda a, b: cmp( a.get(sort_by, '').lower()
                                            , b.get(sort_by, '').lower()
                                            ) )

            if isinstance(max_results, int) and len(result) > max_results:
                result = result[:max_results-1]

        result = tuple(result)
        self.ZCacheable_set(result, view_name=view_name, keywords=criteria)

        return result


    security.declarePrivate('enumerateGroups')
00233     def enumerateGroups( self
                       , id=None
                       , exact_match=False
                       , sort_by=None
                       , max_results=None
                       , **kw
                       ):
        """ Fulfill the GroupEnumerationPlugin requirements """
        view_name = self.getId() + '_enumerateGroups'
        criteria = {'id':id, 'exact_match':exact_match,
                    'sort_by':sort_by, 'max_results':max_results}
        criteria.update(kw)

        cached_info = self.ZCacheable_get(view_name = view_name,
                                          keywords = criteria,
                                          default = None)

        if cached_info is not None:
            logger.debug('returning cached results from enumerateGroups')
            return cached_info

        acl = self._getLDAPUserFolder()

        if acl is None:
            return ()

        if (id is not None and not exact_match and not kw):
            # likely from a PAS.getUserById(). In any case 'id' and
            # 'exact_match' means only a single result should be
            # available so try to fetch specific group info from
            # cache.
            group_info = self._getGroupInfoCache(id)
            if group_info is not None:
                return (group_info,)

        if id is None and exact_match:
            raise ValueError, 'Exact Match requested but no id provided'
        elif id is not None:
            kw[self.groupid_attr] = id

        plugin_id = self.getId()

        results = acl.searchGroups(exact_match=exact_match, **kw)

        if len(results) == 1 and results[0]['cn'] == 'n/a':
            # we didn't give enough known criteria for searches
            return ()

        if isinstance(max_results, int) and len(results) > max_results:
            results = results[:max_results+1]

        for rec in results:
            rec['pluginid'] = plugin_id
            rec['id'] = rec[self.groupid_attr]
            self._setGroupInfoCache(rec)

        results = tuple(results)
        self.ZCacheable_set(results, view_name=view_name, keywords=criteria)

        return results


    security.declarePrivate('enumerateRoles')
00296     def enumerateRoles( self
                      , id=None
                      , exact_match=0
                      , sort_by=None
                      , max_results=None
                      , **kw
                      ):
        """ Fulfill the RoleEnumerationPlugin requirements """
        # For LDAP, roles and groups are really one and the same thing.
        # We can simply call enumerateGroups here.
        return self.enumerateGroups( id=id
                                   , exact_match=exact_match
                                   , sort_by=sort_by
                                   , max_results=max_results
                                   , **kw
                                   )

classImplements( LDAPMultiPlugin
               , IUserEnumerationPlugin
               , IGroupsPlugin
               , IGroupEnumerationPlugin
               , IRoleEnumerationPlugin
               , *implementedBy(LDAPPluginBase)
               )

InitializeClass(LDAPMultiPlugin)


Generated by  Doxygen 1.6.0   Back to index