'''RESTfulAuth''' I wrote this tool, based on the !DigestAuth tool shipped with CherryPy, because standard digest auth per RFC2617 does not offer any way of logging a user out or of time-limiting the use of the authentication credentials. I wanted an authentication scheme which would work well with my app which is designed on RESTful lines - don't add new protocols if HTTP will do the job. In particular I wanted to avoid the use (and added complexity) of cookies and sessions. The RFC does suggest the use of expiring nonces as a way of mitigating replay attacks. Expiring nonces also allow all the extra features I wanted to be included in digest authentication. So this tool extends the standard !DigestAuth tool by: 1. Allowing the time for which a nonce is valid to be specified so that a user is automatically logged out when the nonce expires. 2. Allowing a nonce refresh period to be specified to provide some protection against replay attacks. 3. Allowing the user to log out by visiting a link which expires all her nonces. 4. Providing a simple role based authorisation scheme. 5. Allows authorisation to be specified per-method and per URI. 6. Facilitates the use of this tool in an suexec CGI or mod_wsgi environment by allowing the HTTP Authorization header to be passed in with a different name. All the above are configured on a per-URI basis in the CherryPy config. This scheme is susceptible to all the attacks described in the RFC with the exception that it allows replay attacks to be mitigated. But for any serious application SSL will mitigate the remaining attacks. Server side security (eg of the password database) is not addressed by this tool. '''Changes from DigestAuth''' The nonce storage mechanism is added. This uses the filesystem as a database. The nonce files created are never read - the tool relies entirely on the nonce file metadata. The nonces are generated by the tool - the standard !DigestAuth tool will frequently produce identical nonces since the timestamp it uses has only a 1 second granularity. '''Caveats''' This tool was designed for use in a low traffic environment with a total user count in the hundreds. I'm not a security expert, but I believe that this tool is at least as secure as the built-in !DigestAuth tool. The test cases (after the code) are not terribly comprehensive. {{{ """ Authentication and authorisation tool for RESTful apps. Authentication is performed by expiring-nonce HTTP digest authentication, providing a means of time-limiting logins and a means of logging users out. Authorisation uses role-based access control. Tool parameters are: realm: a string containing the authentication realm. users: a dict of the form: {username: (password, role1, ...)} or a callable returning the password/roles tuple when passed the username. refresh: an integer giving the refresh time for nonces in seconds. expires: an integer giving the maximum life of a nonce (from the last refresh if refreshed). Zero indicates a one-time nonce. If less than 'refresh' (but not zero) 'refresh' will be used as the expiry time. logout: a boolean value. True indicates that all nonces originating from the client making the request to this resource should be expired (regardless of realm). This allows a 'logout' function to be implemented without the use of javascript, cookies or .htaccess magic. A 'logout' link should point to a non-cacheable resource that is protected by restful_auth with this flag set. Default is False. roles: a dict of the form {method1: (role1, role2, ...), ....} The roles for the current method are compared to the user's roles as returned from the 'users' parameter. If no match is found the authentication fails. The default is an empty dict, in which case this tool does not check roles - any authenticated user will be authorised by this tool. The roles are available to the page handler in cherrypy.resource.loginroles. If the 'expires' parameter is greater than 'refresh', then when an nonce has been issued for the time given by the 'refresh' parameter a new nonce will be sent to the client together with the 'stale=true' flag, avoiding the need for the user to re-enter her credentials. The 'refresh' timer is then restarted. If both the 'refresh' and the 'expires' times have been exceeded then the 'stale' flag will not be sent and the user must re-authenticate. Some examples of using the restful-auth tool: The root of the application tree. Here we set up the default parameters for the application tree. The nonce will be refreshed every minute and will expire if there is no activity for 10 minutes. app = main.AppRoot(config, {'tools.restful_auth.on' : False, 'tools.restful_auth.realm': 'Application Realm', 'tools.restful_auth.users': lib.get_users, 'tools.restful_auth.refresh': 60, 'tools.restful_auth.expires': 600}) This URI expires all the nonces from this host. app.logout = main.Logout(config, {'tools.restful_auth.on': True, 'tools.restful_auth.logout': True}) Here we set up the access rights for this branch of the tree. Guests and users can read the resource, users can modify it, and administrators can create new resources. In the users dict administrators would have both the 'admin' and the 'user' roles. app.branch = uri.branch(config, {'tools.restful_auth.roles': {'GET': ('guest', 'user'), 'PUT': ('user'), 'POST': ('admins')}}) Anyone can access a this leaf - restful_auth is off. app.branch.leaf1 = uri.Leaf1(config) Only users with the appropriate roles can access this leaf. app.branch.leaf2 = uri.Leaf2(config, {'tools.restful_auth.on' : True}) This leaf does something serious like modify application settings, so we specify a new realm, a one-time nonce so that every access must be authenticated, and only allow access to a user with the superadmin role. app.branch.leaf3 = uri.Leaf3(config, {'tools.restful_auth.on':True, 'tools.restful_auth.realm':'Administrator realm', 'tools.restful_auth.users': lib.get_users, 'tools.restful_auth.refresh': 15, 'tools.restful_auth.expires': 0, 'tools.restful_auth.roles': {'GET': ('superadmin',), 'POST': ('superadmin',)}}) Copyright (c) PR Hardman 2007. This module is free software, and you may redistribute it and/or modify it under the same terms as Python itself, so long as this copyright message and disclaimer are retained in their original form. IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. Created 5 December 2007 """ import time import os import glob import md5 import random import cherrypy import cherrypy.lib.httpauth as httpauth #------------------------------- restful_auth tool ---------------------# NONCE_DIR = os.path.join(os.path.dirname(__file__), ".nonce") N_EXPIRED = 1 N_REFRESH = 2 N_VALID = 3 STALE_WWW_AUTH = 'Digest realm="%s", nonce="%s", algorithm="MD5", qop="auth", stale=true' STD_WWW_AUTH = 'Digest realm="%s", nonce="%s", algorithm="MD5", qop="auth"' def nonce_state(nonce, expires): """ Report the nonce state. A nonce may be: 1) Expired: Doesn't exist or both 'refresh' and 'expires' exceeded 2) Refresh due: 'refresh' exceeded but 'expires' not exceeded 3) One-time: 'expires' is 0. Expire forcibly (delete). 3) Valid: none of the above """ fn = '%s-%s' % (nonce, cherrypy.request.remote.ip) filename = os.path.join(NONCE_DIR, fn) if not os.path.exists(filename): # Already expired return N_EXPIRED stats = os.stat(filename) if stats.st_mtime < time.time(): # Expire the nonce os.remove(filename) return N_EXPIRED elif stats.st_atime < time.time(): os.remove(filename) return N_REFRESH if expires == 0: os.remove(filename) return N_VALID return N_VALID def record_nonce(nonce, refresh, expires): """ Record the nonce together with it's expiry time and the client IP """ if not os.path.exists(NONCE_DIR): os.mkdir(NONCE_DIR, 0700) # The nonce is a hex digest so it can be used as # a filename with the client ip appended. fn = '%s-%s' % (nonce, cherrypy.request.remote.ip) filename = os.path.join(NONCE_DIR, fn) # expire any stale nonces for name in glob.glob(os.path.join(NONCE_DIR,'*')): if os.stat(name).st_mtime < time.time(): os.remove(name) # Record this nonce/IP try: fd = os.open(filename, os.O_RDONLY | os.O_CREAT | os.O_EXCL) except: raise os.close(fd) # Mark when this nonce will expire now = int(time.time()) if expires > refresh: os.utime(filename, (now + refresh, now + expires)) else: os.utime(filename, (now + refresh, now + refresh)) return def do_logout(): """ Expire all nonces from the current client """ client_ip = cherrypy.request.remote.ip for name in glob.glob(os.path.join(NONCE_DIR,'*')): if name not in ('.', '..'): if name[name.rfind('-') + 1:] == client_ip: os.remove(name) return def check_roles(resource_roles, user_roles): """ Check that one of the resource roles matches the list of user roles """ for role in resource_roles: if role in user_roles: return True return False def check_auth(users, realm, expires, roles): """If an authorization header contains valid credentials return True, else return False. If the nonce has expired but the expires time has not been exceeded return the 'stale' flag. If the application is running under CGI/suexec or mod_wsgi without the WSGIPassAuthorization directive then the authorization header (which will be removed by suexec/mod_wsgi) must be written to the x-dbappauth header by mod-rewrite and mod_headers in the application's .htaccess file as follows: RewriteEngine on RewriteCond %{REQUEST_FILENAME} !-f RewriteRule ^(.*)$ /cgi-bin/run_cgi.cgi/$1 [E=MY_HTTP_AUTH:%{HTTP:Authorization},QSA,PT,L] RequestHeader set x-myappauth "%{MY_HTTP_AUTH}e" env=MY_HTTP_AUTH Based on cherrypy.lib.auth.check_auth() """ auth = False stale = False if 'authorization' in cherrypy.request.headers: auth_hdr = cherrypy.request.headers['authorization'] elif 'x-myappauth' in cherrypy.request.headers: auth_hdr = cherrypy.request.headers['x-myappauth'] if not auth_hdr: return auth, stale else: return auth, stale # make sure the provided credentials are correctly set ah = httpauth.parseAuthorization(auth_hdr) if ah is None: raise cherrypy.HTTPError(400, 'Bad Request') # These lines only required for CherryPy pre 3.1.0rc1 if ah['realm'] != realm: return auth, stale nstate = nonce_state(ah['nonce'], expires) if nstate == N_EXPIRED: # No need to validate the authorization return auth, stale elif nstate == N_REFRESH: stale = True encrypt = httpauth.DIGEST_AUTH_ENCODERS[httpauth.MD5] if callable(users): # A callable must return a tuple userinfo = users(ah["username"]) else: if not isinstance(users, dict): raise ValueError, "Authentication users must be a dict of tuples" # fetch the user password userinfo = users.get(ah["username"], None) if not isinstance(userinfo, tuple): raise ValueError, "Authentication users must be a dict of tuples" if userinfo: # Check that the user's role matches the role allowed if not roles or check_roles(roles.get(cherrypy.request.method), userinfo[1:]): # Validate the authorization by re-computing it here # and compare it with what the user-agent provided if httpauth.checkResponse(ah, userinfo[0], method=cherrypy.request.method, encrypt=encrypt, realm=realm): cherrypy.request.login = ah["username"] cherrypy.request.loginroles = userinfo[1:] return True, stale cherrypy.request.login = False cherrypy.request.loginroles = () return auth, stale def restful_auth(realm, users, refresh, expires, logout=False, roles=None): """ Authentication and authorisation tool for RESTful apps. Authentication is performed by nonce-expiring HTTP digest authentication providing a means of time-limiting logins an a means of logging users out. Authorisation uses role-based access control. Tool parameters are: realm: a string containing the authentication realm. users: a dict of the form: {username: (password, role1, ...)} or a callable returning the password/roles tuple when passed the username. refresh: an integer giving the refresh time for nonces in seconds. expires: an integer giving the maximum life of a nonce (from the last refresh if refreshed). Zero indicates a one-time nonce. If less than 'refresh' (but not zero) 'refresh' will be used as the expiry time. logout: a boolean value. True indicates that all nonces originating from the client making the request to this resource should be expired (regardless of realm). This allows a 'logout' function to be implemented without the use of javascript, cookies or .htaccess magic. A 'logout' link should point to a non-cacheable resource that is protected by restful_auth with this flag set. Default is False. roles: a dict of the form {method1: (role1, role2, ...), ....} The roles for the current method are compared to the user's roles as returned from the 'users' parameter. If no match is found the authentication fails. The default is an empty dict, in which case this tool does not check roles - any authenticated user will be authorised by this tool. The roles are available to the page handler in cherrypy.resource.loginroles. If the 'expires' parameter is greater than 'refresh', then when an nonce has been issued for the time given by the 'refresh' parameter a new nonce will be sent to the client together with the 'stale=true' flag, avoiding the need for the user to re-enter her credentials. The 'refresh' timer is then restarted. If both the 'refresh' and the 'expires' times have been exceeded then the 'stale' flag will not be sent and the user must re-authenticate. """ if logout: do_logout() return auth, stale = check_auth(users, realm, expires, roles) # Calculate the nonce and build the www-authenticate header here # rather than use the function in httpauth because that function may give # duplicate nonces since it only uses the seconds from time.time(). if auth: if not stale: return else: nonce = md5.new("%d:%s:%.6f" % (time.time(), realm, random.random())).hexdigest() www_auth = STALE_WWW_AUTH % (realm, nonce) else: nonce = md5.new("%d:%s:%.6f" % (time.time(), realm, random.random())).hexdigest() www_auth = STD_WWW_AUTH % (realm, nonce) cherrypy.response.headers['www-authenticate'] = www_auth # Record the new nonce record_nonce(nonce, refresh, expires) raise cherrypy.HTTPError(401, "You are not authorized to access that resource") cherrypy.tools.restful_auth = cherrypy.Tool('on_start_resource', restful_auth) }}} '''Test Cases''' {{{ """ Test cases for the restful_auth tool """ from cherrypy.test import test test.prefer_parent_path() import os import time import cherrypy from cherrypy.lib import httpauth import tools def setup_server(): class Root: def index(self): return "This is public." index.exposed = True class Protected: def index(self): return "Hello %s, you've been authorized." % cherrypy.request.login index.exposed = True class Roles: def index(self, *args, **kwargs): if cherrypy.request.method == 'GET': return "Hello %s, you've been authorized to GET." \ % cherrypy.request.login elif cherrypy.request.method == 'POST': return "Hello %s, you've been authorized to POST." \ % cherrypy.request.login else: raise cherrypy.HTTPError(405) index.exposed = True class Protected3: def index(self): return "Hello %s, you've been authorized." % cherrypy.request.login index.exposed = True class Protected4: def index(self): return "Hello %s, you've been authorized." % cherrypy.request.login index.exposed = True class Logout: def index(self): return "You are now logged out." index.exposed = True test_roles = {'test': ('test#', 'role1', 'role3'), 'postman': ('postman#', 'role2', 'role3')} def fetch_user(user): """ Return the user's password and login roles """ return test_roles.get(user) conf = {'/': {'tools.restful_auth.on': False, 'tools.restful_auth.realm': 'localhost', 'tools.restful_auth.users': fetch_user, 'tools.restful_auth.refresh': 30, 'tools.restful_auth.expires': 60}, '/digest': { 'tools.restful_auth.on': True,}, '/refresh': { 'tools.restful_auth.on': True, 'tools.restful_auth.refresh': 1, 'tools.restful_auth.expires': 3}, '/onetime': { 'tools.restful_auth.on': True, 'tools.restful_auth.expires': 0}, '/logout': {'tools.restful_auth.on': True, 'tools.restful_auth.logout': True}, '/roles': {'tools.restful_auth.on': True, 'tools.restful_auth.roles': {'GET': ('role1', 'role3'), 'POST': ('role2',)}}, } root = Root() root.digest = Protected() root.refresh = Protected() root.onetime = Protected() root.roles = Roles() root.logout = Logout() cherrypy.tree.mount(root, config=conf) cherrypy.config.update({'environment': 'test_suite', 'log.error_file': os.path.join(os.path.dirname(__file__), 'cp_error_log'), }) from cherrypy.test import helper class RESTWebCase(helper.CPWebCase): """ Add extra methods for our test cases """ def get_nonce(self, msg = None): """ Extract the nonce from the www-authenticate request header """ www_auth = None for k, v in self.headers: if k.lower() == "www-authenticate": www_auth = v break if www_auth is None: self._handlewebError( "Check nonce: www-authenticate header not found: %s" % msg) return nonce = None www_auth = www_auth[7:] items = www_auth.split(', ') for item in items: key, value = item.split('=') if key.lower() == 'nonce': nonce = value.strip('"') break if nonce is None: self._handlewebError( "Nonce was not found in www-authenticate header: %s" % msg) return nonce def calc_auth_hdr(self, nonce, path, user, password, method='GET'): """ Return the authenticate header string """ base_auth = 'Digest username="%s", realm="localhost", nonce="%s", uri=%s, algorithm=MD5, response="%s", qop=auth, nc=%s, cnonce="1522e61005789929"' auth = base_auth % (user, nonce, path, '', '00000001') params = httpauth.parseAuthorization(auth) response = httpauth._computeDigestResponse(params, password, method) return base_auth % (user, nonce, path, response, '00000001') def assertStale(self, msg = None): """ Check that the stale flag is set in the www-authenticate header """ www_auth = None for k, v in self.headers: if k.lower() == "www-authenticate": www_auth = v break if www_auth is None: self._handlewebError( "Checking stale flag on: : www-authenticate header not found", msg) stale = None www_auth = www_auth[7:] items = www_auth.split(', ') for item in items: key, value = item.split('=') if key.lower() == 'stale': stale = value.strip('"') break if stale is None: self._handlewebError( "Stale flag not found in www-authenticate header", msg) if stale != 'true': self._handlewebError( "Stale flag not set to 'true' in www-authenticate header") return def assertNotStale(self, msg = None): """ Check that the stale flag is not set in the www-authenticate header """ www_auth = None for k, v in self.headers: if k.lower() == "www-authenticate": www_auth = v break if www_auth is None: self._handlewebError( "Checking stale flag off: : www-authenticate header not found", msg) stale = None www_auth = www_auth[7:] items = www_auth.split(', ') for item in items: key, value = item.split('=') if key.lower() == 'stale': stale = value.strip('"') break if stale: self._handlewebError( "Stale flag was found in www-authenticate header", msg) return class RESTAuthTest(RESTWebCase): def testBasicRestfulDigest(self): """ Basic restful auth test including test of digest auth """ path = "/digest/" self.getPage(path) self.assertStatus(401, 'Basic test first request') value = None for k, v in self.headers: if k.lower() == "www-authenticate": if v.startswith("Digest"): value = v break if value is None: self._handlewebError("Digest authentification scheme was not found") value = value[7:] items = value.split(', ') tokens = {} for item in items: key, value = item.split('=') tokens[key.lower()] = value missing_msg = "%s is missing" bad_value_msg = "'%s' was expecting '%s' but found '%s'" nonce = None if 'realm' not in tokens: self._handlewebError(missing_msg % 'realm') elif tokens['realm'] != '"localhost"': self._handlewebError(bad_value_msg % ('realm', '"localhost"', tokens['realm'])) if 'nonce' not in tokens: self._handlewebError(missing_msg % 'nonce') else: nonce = tokens['nonce'].strip('"') if 'algorithm' not in tokens: self._handlewebError(missing_msg % 'algorithm') elif tokens['algorithm'] != '"MD5"': self._handlewebError(bad_value_msg % ('algorithm', '"MD5"', tokens['algorithm'])) if 'qop' not in tokens: self._handlewebError(missing_msg % 'qop') elif tokens['qop'] != '"auth"': self._handlewebError(bad_value_msg % ('qop', '"auth"', tokens['qop'])) auth = self.calc_auth_hdr(nonce, path, 'test', 'test#') msg = 'Basic test after authentication' self.getPage(path, [('Authorization', auth)]) self.assertStatus('200 OK', msg) self.assertBody("Hello test, you've been authorized.", msg) def testPublic(self): """ Public access """ self.getPage("/") msg = 'Public test' self.assertStatus('200 OK', msg) self.assertHeader('Content-Type', 'text/html', msg) self.assertBody('This is public.', msg) def testRefresh(self): """ Test refreshing nonce """ path = "/refresh/" self.getPage(path) self.assertStatus(401, 'Refresh test first request') nonce1 = self.get_nonce('Refresh test first nonce') auth = self.calc_auth_hdr(nonce1, path, 'test', 'test#') msg = 'Refresh test first authenticated request' self.getPage(path, [('Authorization', auth)]) self.assertStatus('200 OK', msg) self.assertBody("Hello test, you've been authorized.", msg) # Wait two seconds, then check we get a new nonce with the stale flag time.sleep(2) self.getPage(path, [('Authorization', auth)]) msg = 'Refresh test - nonce requires refresh request' self.assertStatus(401, msg) self.assertStale(msg) nonce2 = self.get_nonce('Refresh test second nonce') self.assertNotEqual(nonce1, nonce2) auth = self.calc_auth_hdr(nonce2, path, 'test', 'test#') msg = 'Refresh test - refreshed nonce request' self.getPage(path, [('Authorization', auth)]) self.assertStatus('200 OK', msg) self.assertBody("Hello test, you've been authorized.", msg) # Wait four seconds, then check we get a 401 without the stale flag # and with a new nonce time.sleep(4) self.getPage(path, [('Authorization', auth)]) msg = 'Refresh test - expired nonce request' self.assertStatus(401, msg) self.assertNotStale(msg) nonce3 = self.get_nonce('Refresh test third nonce') self.assertNotEqual(nonce3, nonce2) def testOneTime(self): """ Test one-time nonce """ path = "/onetime/" self.getPage(path) self.assertStatus(401, 'One-time test first request') nonce = self.get_nonce('One-time test nonce') auth = self.calc_auth_hdr(nonce, path, 'test', 'test#') self.getPage(path, [('Authorization', auth)]) msg = 'One-time test - THE request' self.assertStatus('200 OK', msg) self.assertBody("Hello test, you've been authorized.", msg) # Now check the nonce has expired self.getPage(path, [('Authorization', auth)]) self.assertStatus(401, 'One-time request on expired nonce') def testLogout(self): """ Test logout """ # First authenticate using the /digest path path = "/digest/" self.getPage(path) self.assertStatus(401, 'Logout test first request') nonce = self.get_nonce('Logout test nonce') auth = self.calc_auth_hdr(nonce, path, 'test', 'test#') self.getPage(path, [('Authorization', auth)]) msg = 'Logout test after authentication' self.assertStatus('200 OK', msg) self.assertBody("Hello test, you've been authorized.", msg) # Now logout and check we get a 401 on a subsequent access to /digest path = '/logout/' msg = 'Logout test - request to Logout page' self.getPage('/logout/', [('Authorization', auth)]) self.assertStatus('200 OK', msg) self.assertBody("You are now logged out.", msg) self.getPage('/digest/', [('Authorization', auth)]) self.assertStatus(401, 'Logout test - request after logout') def testRoles(self): """ Test login roles """ # GET a page as 'test' path = "/roles/" self.getPage(path) self.assertStatus(401, "Role test first request as 'test'") nonce = self.get_nonce("'Role test nonce for 'test'") auth = self.calc_auth_hdr(nonce, path, 'test', 'test#') msg = "Role test authorised GET as 'test'" self.getPage(path, [('Authorization', auth)]) self.assertStatus('200 OK', msg) self.assertBody("Hello test, you've been authorized to GET.", msg) # POST some data as 'test' msg = "Role test un-authorised POST as 'test'" self.getPage(path, [('Authorization', auth)], 'POST', 'some data') self.assertStatus(401, msg) # Try again with the correct method in the response nonce = self.get_nonce("'Role test POST nonce for 'test'") auth = self.calc_auth_hdr(nonce, path, 'test', 'test#', 'POST') msg = "Role test 'authorised' POST as 'test'" self.getPage(path, [('Authorization', auth)], 'POST', 'some data') self.assertStatus(401, msg) # POST some data as 'postman' auth = self.calc_auth_hdr(nonce, path, 'postman', 'postman#') self.getPage(path, [('Authorization', auth)], 'POST', 'some data') self.assertStatus(401, "Role test first POST as 'postman'") nonce = self.get_nonce("Role test nonce for 'postman'") auth = self.calc_auth_hdr(nonce, path, 'postman', 'postman#', 'POST') self.getPage(path, [('Authorization', auth)], 'POST', 'some data') msg = "Role test authorised POST as 'postman'" self.assertStatus('200 OK', msg) self.assertBody("Hello postman, you've been authorized to POST.", msg) if __name__ == "__main__": setup_server() helper.testmain() }}}