ERP5LoginUserManager.py 13.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
##############################################################################
#
# Copyright (c) 2016 Nexedi SARL and Contributors. All Rights Reserved.
#                    Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from functools import partial
from Products.ERP5Type.Globals import InitializeClass
from AccessControl import ClassSecurityInfo
31
from AuthEncoding.AuthEncoding import pw_validate
32 33 34 35 36
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.interfaces.plugins import IAuthenticationPlugin
from Products.PluggableAuthService.interfaces.plugins import IUserEnumerationPlugin
37
from Products.ERP5Type.TransactionalVariable import getTransactionalVariable
38 39 40
from DateTime import DateTime
from Products import ERP5Security
from AccessControl import SpecialUsers
41 42
from Shared.DC.ZRDB.DA import DatabaseError
from zLOG import LOG, ERROR
43

44
SYSTEM_USER_USER_NAME = SpecialUsers.system.getUserName()
45 46 47 48
# To prevent login thieves
SPECIAL_USER_NAME_SET = (
  ERP5Security.SUPER_USER,
  SpecialUsers.nobody.getUserName(),
49
  SYSTEM_USER_USER_NAME,
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
  # Note: adding emergency_user is pointless as its login is variable, so no
  # way to prevent another user from stealing its login.
)

manage_addERP5LoginUserManagerForm = PageTemplateFile(
  'www/ERP5Security_addERP5LoginUserManager', globals(),
  __name__='manage_addERP5LoginUserManagerForm' )

def addERP5LoginUserManager(dispatcher, id, title=None, RESPONSE=None):
  """ Add a ERP5LoginUserManager to a Pluggable Auth Service. """
  eum = ERP5LoginUserManager(id, title)
  dispatcher._setObject(eum.getId(), eum)
  if RESPONSE is not None:
    RESPONSE.redirect(eum.absolute_url() + '/manage_main')

class ERP5LoginUserManager(BasePlugin):
  """ PAS plugin for managing users in ERP5
  """
  meta_type = 'ERP5 Login User Manager'
  login_portal_type = 'ERP5 Login'
  security = ClassSecurityInfo()

  def __init__(self, id, title=None):
    self._id = self.id = id
    self.title = title

  #
  #   IAuthenticationPlugin implementation
  #
  security.declarePrivate('authenticateCredentials')
  def authenticateCredentials(self, credentials):
    login_portal_type = credentials.get(
      'login_portal_type',
      self.login_portal_type,
    )
    if 'external_login' in credentials:
      # External plugin: extractor plugin can validate credential validity.
      # Our job is to locate the actual user and check related documents
      # (assignments...).
      check_password = False
      login_value = self._getLoginValueFromLogin(
        credentials.get('external_login'),
        login_portal_type=login_portal_type,
      )
    elif 'login_relative_url' in credentials:
      # Path-based login: extractor plugin can validate credential validity and
      # directly locate the login document. Our job is to check related
      # documents (assignments...).
      check_password = False
      login_value = self.getPortalObject().unrestrictedTraverse(
        credentials.get("login_relative_url"),
      )
    else:
      # Traditional login: find login document from credentials, check password
      # and check related documents (assignments...).
      check_password = True
      login_value = self._getLoginValueFromLogin(
        credentials.get('login'),
        login_portal_type=login_portal_type,
      )
    if login_value is None:
      return
    user_value = login_value.getParentValue()
113
    if not self._isUserValueValid(user_value):
114
      return
115

116 117 118
    is_authentication_policy_enabled = self.getPortalObject().portal_preferences.isAuthenticationPolicyEnabled()
    if check_password:
      password = credentials.get('password')
119 120 121 122
      login_password = login_value.getPassword()
      if (not password
          or login_password is None
          or not pw_validate(login_password, password)):
123
        if is_authentication_policy_enabled:
124 125 126 127 128
          tv = getTransactionalVariable()
          login_failure_key = 'notified_login_failure_' + login_value.getRelativeUrl()
          if tv.get(login_failure_key) is None:
            login_value.notifyLoginFailure()
            tv[login_failure_key] = 1
129 130 131 132 133 134 135
        return
    if is_authentication_policy_enabled:
      if login_value.isPasswordExpired():
        login_value.notifyPasswordExpire()
        return
      if login_value.isLoginBlocked():
        return
136
    return (user_value.getUserId(), login_value.getReference())
137

138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
  def _isUserValueValid(self, user_value):
    if not user_value.hasUserId():
      return
    if user_value.getValidationState() == 'deleted':
      return
    if user_value.getPortalType() in ('Person', ):
      now = DateTime()
      for assignment in user_value.contentValues(portal_type="Assignment"):
        if assignment.getValidationState() == "open" and (
          not assignment.hasStartDate() or assignment.getStartDate() <= now
        ) and (
          not assignment.hasStopDate() or assignment.getStopDate() >= now
        ):
          return True
      else:
        return
    
    return True



159
  def _getLoginValueFromLogin(self, login, login_portal_type=None):
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
    try:
      user_list = self.enumerateUsers(
        login=login,
        exact_match=True,
        login_portal_type=login_portal_type,
      )
    except DatabaseError:
      # DatabaseError gets raised when catalog is not functional. In which case
      # it should be fine to bail without any user, letting PAS continue trying
      # other plugins.
      LOG(
        repr(self),
        ERROR,
        'enumerateUsers raised, bailing',
        error=True,
      )
      user_list = []
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
    if not user_list:
      return
    single_user, = user_list
    single_login, = single_user['login_list']
    path = single_login['path']
    if path is None:
      return
    return self.getPortalObject().unrestrictedTraverse(path)

  #
  #   IUserEnumerationPlugin implementation
  #
  security.declarePrivate('enumerateUsers')
  def enumerateUsers(self, id=None, login=None, exact_match=False,
             sort_by=None, max_results=None, login_portal_type=None, **kw):
    """ See IUserEnumerationPlugin.
    """
    portal = self.getPortalObject()
    if login_portal_type is None:
      login_portal_type = portal.getPortalLoginTypeList()
    unrestrictedSearchResults = portal.portal_catalog.unrestrictedSearchResults
    searchUser = lambda **kw: unrestrictedSearchResults(
199
      select_list=('user_id', ),
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
      **kw
    ).dictionaries()
    searchLogin = lambda **kw: unrestrictedSearchResults(
      select_list=('parent_uid', 'reference'),
      validation_state='validated',
      **kw
    ).dictionaries()
    if login_portal_type is not None:
      searchLogin = partial(searchLogin, portal_type=login_portal_type)
    special_user_name_set = set()
    if login is None:
      # Only search by id if login is not given. Same logic as in
      # PluggableAuthService.searchUsers.
      if isinstance(id, str):
        id = (id, )
215 216 217
      # Short-cut "System Processes" as not being searchable by user_id.
      # This improves performance in proxy-role'd execution by avoiding an
      # sql query expected to find no user.
218
      id = [x for x in id or () if x != SYSTEM_USER_USER_NAME]
219 220 221 222 223 224 225
      if id:
        if exact_match:
          requested = set(id).__contains__
        else:
          requested = lambda x: True
        user_list = [
          x for x in searchUser(
226
            user_id={
227 228 229 230 231
              'query': id,
              'key': 'ExactMatch' if exact_match else 'Keyword',
            },
            limit=max_results,
          )
232
          if requested(x['user_id'])
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
        ]
      else:
        user_list = []
      login_dict = {}
      if user_list:
        for login in searchLogin(parent_uid=[x['uid'] for x in user_list]):
          login_dict.setdefault(login['parent_uid'], []).append(login)
    else:
      if isinstance(login, str):
        login = (login, )
      login_list = []
      for user_login in login:
        if user_login in SPECIAL_USER_NAME_SET:
          special_user_name_set.add(user_login)
        else:
          login_list.append(user_login)
249 250 251 252
          # Ignore leading or trailing space in login name
          user_login_stripped = user_login.strip()
          if user_login_stripped != user_login:
            login_list.append(user_login_stripped)
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
      login_dict = {}
      if exact_match:
        requested = set(login_list).__contains__
      else:
        requested = lambda x: True
      if login_list:
        for login in searchLogin(
          reference={
            'query': login_list,
            'key': 'ExactMatch' if exact_match else 'Keyword',
          },
          limit=max_results,
        ):
          if requested(login['reference']):
            login_dict.setdefault(login['parent_uid'], []).append(login)
      if login_dict:
        user_list = searchUser(uid=list(login_dict))
      else:
        user_list = []
    plugin_id = self.getId()
    result = [
      {
275
        'id': user['user_id'],
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
        # Note: PAS forbids us from returning more than one entry per given id,
        # so take any available login.
        'login': login_dict.get(user['uid'], [{'reference': None}])[0]['reference'],
        'pluginid': plugin_id,

        # Extra properties, specific to ERP5
        'path': user['path'],
        'uid': user['uid'],
        'login_list': [
          {
            'reference': login['reference'],
            'path': login['path'],
            'uid': login['uid'],
          }
          for login in login_dict.get(user['uid'], [])
        ],
      }
293
      for user in user_list if user['user_id']
294
    ]
295 296
    
    tv = getTransactionalVariable()
297 298
    user_value = tv.get("transactional_user", None) 
    if user_value is not None and self._isUserValueValid(user_value):
Rafael Monnerat's avatar
Rafael Monnerat committed
299 300
      login_value_list = [l for l in user_value.objectValues(login_portal_type)
        if l.getValidationState() == 'validated' and l.getPassword() is not None]
301

Rafael Monnerat's avatar
Rafael Monnerat committed
302
      if (login is not None and login in [(i.getReference(),) for i in login_value_list]) or \
303
           (id and user_value.getUserId() == id[0] and login_value_list):
304
        result.append({
305
          'id': user_value.getUserId(),
306 307
          # Note: PAS forbids us from returning more than one entry per given id,
          # so take any available login.
Rafael Monnerat's avatar
Rafael Monnerat committed
308
          'login': login_value_list[0].getReference(), 
309 310 311
          'pluginid': plugin_id,

          # Extra properties, specific to ERP5
312 313
          'path': user_value.getPath(),
          'uid': user_value.getUid(),
314 315
          'login_list': [
            {
316 317 318
              'reference': login_value.getReference(),
              'path': login_value.getRelativeUrl(),
              'uid': login_value.getPath(),
Rafael Monnerat's avatar
Rafael Monnerat committed
319
            } for login_value in login_value_list
320 321 322
          ],
        })

323 324 325 326 327 328 329 330 331 332 333 334 335
    for special_user_name in special_user_name_set:
      # Note: special users are a bastard design in Zope: they are expected to
      # have a user name (aka, a login), but no id (aka, they do not exist as
      # users). This is likely done to prevent them from having any local role
      # (ownership or otherwise). In reality, they should have an id (they do
      # exist, and user ids are some internal detail where it is easy to avoid
      # such magic strings) and no login (because nobody should ever be able to
      # log in as a special user, and logins are exposed to users (duh !) and
      # hence magic values are impossible to avoid with ad-hoc code peppered
      # everywhere). To avoid such ad-hoc code, this plugin will find magic
      # users so code checking if a user login exists before allowing it to be
      # reused, preventing misleading logins from being misused.
      result.append({
336
        'id': special_user_name,
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
        'login': special_user_name,
        'pluginid': plugin_id,

        'path': None,
        'uid': None,
        'login_list': [
          {
            'reference': special_user_name,
            'path': None,
            'uid': None,
          }
        ]
      })
    return tuple(result)

352 353 354 355 356 357 358 359 360 361 362
  security.declarePrivate('updateUser')
  def updateUser(self, user_id, login_name):
    # Operation not supported here
    return False

  security.declarePrivate('updateEveryLoginName')
  def updateEveryLoginName(self, quit_on_first_error=True):
    # Operation not supported here
    raise NotImplementedError()


363 364
classImplements(ERP5LoginUserManager, IAuthenticationPlugin, IUserEnumerationPlugin)
InitializeClass(ERP5LoginUserManager)