This snippet shows a simple class to manage a connection to eDirectory in Python. It uses the __connect, __create, __delete, __rename, and __modify functions that can also be found in other code snippets. It also uses the CN and DN classes that were defined in other code snippets.
import ldap
class LDAPConnection:
__port = 0
def __init__( self, host, port, binddn, password, scope ):
self.__host = host
self.__port = port
self.__binddn = CN( binddn )
self.__password = password
if scope.upper() == "SUB":
self.__scope = ldap.SCOPE_SUBTREE
elif scope.upper() == "ONE":
self.__scope = ldap.SCOPE_ONE
else:
self.__scope = ldap.SCOPE_BASE
def __connect( self, host, binddn, password, port=389 ):
handle = ldap.open( host, port )
if handle:
handle.simple_bind_s( binddn, password )
return handle
return False
def __search( self, handle, basedn, filter, scope=ldap.SCOPE_SUBTREE):
if not handle:
return False
return handle.search_s( basedn, scope, filter )
def __create( self, handle, dn, attrs ):
if not handle:
return False
handle.add_s( dn, attrs )
def __delete( self, handle, dn ):
if not handle:
return False
handle.delete_s( dn )
def __rename( self, handle, olddn, newdn, delete_olddn=1 ):
if not handle:
return False
handle.modrdn_s( olddn, newdn, delete_olddn )
def __modify( self, handle, dn, attrs ):
if not handle:
return False
handle.modify_s( dn, attrs )
def TestConnection( self, basedn, filter ):
# Create a test connection.
# This will try to connect and search based on the
# input given to the class. If the connection fails,
# it will return False. If the connection succeeds
# but there is nothing in the tree at the search base,
# it will return False; so it is important to provide
# real search data.
self.__ldap_connection_handle = self.__connect( self.__host, self.__binddn.toString(), self.__password, self.__port )
if not self.__ldap_connection_handle:
return False
return len( self.__search( self.__ldap_connection_handle, basedn, filter, self.__scope ) ) != 0
def GetAllUserObjects( self, basedn, filter ):
if not self.__ldap_connection_handle:
self.__ldap_connection_handle = self.__connect( self.__host, self.__binddn.toString(), self.__password, self.__port )
if not self.__ldap_connection_handle:
return False
return self.__search( self.__ldap_connection_handle, basedn, filter, self.__scope )
def CreateObject( self, dn, attrs ):
if not self.__ldap_connection_handle:
self.__ldap_connection_handle = self.__connect( self.__host, self.__binddn.toString(), self.__password, self.__port )
if not self.__ldap_connection_handle:
return False
self.__create( self.__ldap_connection_handle, dn.toString(), attrs )
return True
def DeleteObject( self, dn ):
if not self.__ldap_connection_handle:
self.__ldap_connection_handle = self.__connect( self.__host, self.__binddn.toString(), self.__password, self.__port )
if not self.__ldap_connection_handle:
return False
self.__delete( self.__ldap_connection_handle, dn.toString() )
return True
def RenameObject( self, olddn, newdn, del_olddn=True ):
if not self.__ldap_connection_handle:
self.__ldap_connection_handle = self.__connect( self.__host, self.__binddn.toString(), self.__password, self.__port )
if not self.__ldap_connection_handle:
return False
if del_olddn:
self.__rename( self.__ldap_connection_handle, olddn.toString(), newdn.toString() )
else:
self.__rename( self.__ldap_connection_handle, olddn.toString(), newdn.toString(), 0 )
def AddAttributes( self, dn, newattrs ):
if not self.__ldap_connection_handle:
self.__ldap_connection_handle = self.__connect( self.__host, self.__binddn.toString(), self.__password, self.__port )
if not self.__ldap_connection_handle:
return False
attrs = []
for type, value in newattrs:
attrs.append( (ldap.MOD_ADD,type,value) )
self.__modify( self.__ldap_connection_handle, dn.toString(), attrs )
def ModifyAttributes( self, dn, changedattrs ):
if not self.__ldap_connection_handle:
self.__ldap_connection_handle = self.__connect( self.__host, self.__binddn.toString(), self.__password, self.__port )
if not self.__ldap_connection_handle:
return False
attrs = []
for type, value in changedattrs:
attrs.append( (ldap.MOD_REPLACE,type,value) )
self.__modify( self.__ldap_connection_handle, dn.toString(), attrs )
def DeleteAttributes( self, dn, delattrs ):
if not self.__ldap_connection_handle:
self.__ldap_connection_handle = self.__connect( self.__host, self.__binddn.toString(), self.__password, self.__port )
if not self.__ldap_connection_handle:
return False
attrs = []
for type,value in delattrs:
attrs.append( (ldap.MOD_DELETE,type,value) )
self.__modify( self.__ldap_connection_handle, dn.toString(), attrs )
def ModifyObject( self, dn, attrs ):
if not self.__ldap_connection_handle:
self.__ldap_connection_handle = self.__connect( self.__host, self.__binddn.toString(), self.__password, self.__port )
if not self.__ldap_connection_handle:
return False
self.__modify( self.__ldap_connection_handle, dn.toString(), attrs )
© 2008 Novell, Inc. All Rights Reserved.