Lots of people have asked me when the django-openid package will provide tools for running an OpenID provider (in addition to an OpenID consumer). The answer is "when it's done", but since it's such a common request I've decided to post some example code to help people who want to work it out for themselves.
This is the openidserver.py file from idproxy.net. It is incomplete - the urlconf, models, templates and some utility functions are not included. In other words, it's useless for anything other than providing a few hints as to how you can go about implementing a provider.
Nonetheless, it's better than nothing. I hope this will prove a useful example for people trying to figure out how to best integrate the JanRain Python OpenID library with Django to build an OpenID provider.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 | from django.http import HttpResponse, HttpResponseRedirect
from django.shortcuts import render_to_response
from models import *
OPENID_FILESTORE = '/tmp/openid-filestore'
from openid.store.filestore import FileOpenIDStore
from openid.server.server import Server
from idproxynet.idproxy.views import yhash_from_cookie
from idproxynet import nonceprotect
import datetime, pickle
def openid_server(req):
"""
This view is the actual OpenID server - running at the URL pointed to by
the <link rel="openid.server"> tag.
"""
server = Server(FileOpenIDStore(OPENID_FILESTORE))
# Clear AuthorizationInfo session var, if it is set
if req.session.get('AuthorizationInfo', None):
del req.session['AuthorizationInfo']
querydict = dict(req.REQUEST.items())
orequest = server.decodeRequest(querydict)
if not orequest:
return HttpResponse("This is an OpenID server")
if orequest.mode in ("checkid_immediate", "checkid_setup"):
if openid_is_authorized(req, orequest.identity,
orequest.trust_root):
oresponse = orequest.answer(True)
elif orequest.immediate:
oresponse = orequest.answer(False, 'http://idproxy.net/openid/server/')
else:
return openid_decide_page(req, orequest)
else:
oresponse = server.handleRequest(orequest)
webresponse = server.encodeResponse(oresponse)
return django_response(webresponse)
def django_response(webresponse):
"Convert a webresponse from the OpenID library in to a Django HttpResponse"
response = HttpResponse(webresponse.body)
response.status_code = webresponse.code
for key, value in webresponse.headers.items():
response[key] = value
return response
def pending_request(req, id):
"""
This page allows users to continue a pending request - i.e. one that was
stashed away because they weren't authenticated with the site.
"""
# Allows user to retry an OpenID response stashed in PendingRequest table
yhash = yhash_from_cookie(req)
if not yhash:
return error_page(req, "You are not signed in")
try:
yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
except YahooUser.DoesNotExist:
return error_page(req, "You are signed in as an invalid user")
try:
pendingrequest = PendingRequest.objects.get(
id = id,
openid__user = yahoouser,
)
except PendingRequest.DoesNotExist:
return error_page(req, "Pending request does not exist")
orequest = pendingrequest.unpickled()
# If they already trust the trust_root, log them straight in after deleting
# the pending request
if openid_is_authorized(req, orequest.identity, orequest.trust_root):
pendingrequest.delete()
oresponse = orequest.answer(True)
server = Server(FileOpenIDStore(OPENID_FILESTORE))
webresponse = server.encodeResponse(oresponse)
return django_response(webresponse)
# Show them the decide page
return openid_decide_page(req, orequest, pendingrequest.id)
def openid_decide_page(req, orequest, pending_id = None):
"""
The page that asks the user if they really want to sign in to the site, and
lets them add the consumer to their trusted whitelist.
"""
# If user is logged in, ask if they want to trust this trust_root
# If they are NOT logged in, show the landing page
yhash = yhash_from_cookie(req)
if not yhash:
return landing_page(req, orequest)
try:
yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
except YahooUser.DoesNotExist:
return error_page(req, "You are signed in as an invalid user")
# Check that the user owns the requested identity
subdomain = orequest.identity.replace('http://', '').replace(
'.idproxy.net/', '')
try:
yahoouser.openid_set.get(subdomain = subdomain)
except OpenId.DoesNotExist:
return error_page(req, "You are signed in to idproxy.net but do " + \
"not own the requested OpenID")
# They are logged in - ask if they want to trust this root
req.session['orequest'] = orequest
return render_to_response('decide.html', {
'title': 'Trust this site?',
'djnonce': nonceprotect.new_nonce_for_path_and_yhash('/decide/', yhash),
'trust_root': orequest.trust_root,
'identity': orequest.identity,
'yahoouser': yahoouser,
'pending_id': pending_id,
})
import pprint
from cgi import escape
return HttpResponse(
("This is the page where you decide on identity %s page %s" % (
orequest.identity, orequest.trust_root
)) + escape(pprint.pformat(orequest.__dict__.items()))
)
def error_page(req, msg):
return render_to_response('error.html', {
'title': 'Error',
'msg': msg,
})
def openid_decide_post(req):
"""
The user has submitted the openid_decide_page
"""
assert req.POST, "POST required for this page"
orequest = req.session.get('orequest', None)
assert orequest, "Should have orequest stashed in their session"
yhash = yhash_from_cookie(req)
if not yhash:
return error_page(req, "You are not signed in")
try:
yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
except YahooUser.DoesNotExist:
return error_page(req, "You are signed in as an invalid user")
try:
nonceprotect.validate_nonce_against_path_and_yhash(
req.POST.get('djnonce', ''), req.path, yhash
)
except nonceprotect.BadNonce:
return error_page(req, "Your form has expired; hit back and try again")
# Ensure that auth_user owns the requested domain
try:
subdomain = orequest.identity.replace(
'http://', '').replace('.idproxy.net/', '')
openid = yahoouser.openid_set.get(subdomain=subdomain)
except OpenId.DoesNotExist:
return error_page(req, "You do not own that OpenID")
# If there was an associated pending request, delete it
if req.POST.get('pending_id', None):
PendingRequest.objects.get(
id = req.POST.get('pending_id'),
openid = openid
).delete()
if req.POST.get('cancel', None):
# Cancel their request
del req.session['orequest']
return HttpResponseRedirect(orequest.getCancelURL())
if req.POST.get('always', None):
# Save this to their trust roots
openid.trustedroot_set.create(
trust_root = orequest.trust_root,
created = datetime.datetime.now()
)
# Redirect and say yes
oresponse = orequest.answer(True)
server = Server(FileOpenIDStore(OPENID_FILESTORE))
webresponse = server.encodeResponse(oresponse)
# Add this to their history
OpenIdHistory.objects.create(
openid = openid,
trust_root = orequest.trust_root
)
return django_response(webresponse)
def landing_page(req, orequest):
"""
The page shown when the user attempts to sign in somewhere using OpenID
but is not authenticated with the site. For idproxy.net, a message telling
them to log in manually is displayed.
"""
# stash details so when they have logged in they can complete the auth
subdomain = orequest.identity.replace('http://', '').replace(
'.idproxy.net/', '')
try:
openid = OpenId.objects.get(subdomain = subdomain)
except OpenId.DoesNotExist:
return error_page(req, 'That OpenID does not exist')
openid.pendingrequest_set.create(
trust_root = orequest.trust_root,
orequest = pickle.dumps(orequest)
)
return render_to_response('landing.html', {
'title': 'You need to sign in',
})
def openid_cancel(request):
"Called when the user cancels a pending OpenID auth request"
if request.session.get('AuthorizationInfo', None):
del request.session['AuthorizationInfo']
return HttpResponseRedirect(request.GET.get('n', '/'))
import urlparse
def openid_is_authorized(req, identity_url, trust_root):
"""
Check that they own the given identity URL, and that the trust_root is
in their whitelist of trusted sites.
"""
domain = urlparse.urlparse(identity_url)[1]
subdomain = domain.split('.')[0]
yhash = yhash_from_cookie(req)
if not yhash:
return False
try:
yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
except YahooUser.DoesNotExist:
return False
# Ensure that auth_user owns the requested domain
try:
openid = yahoouser.openid_set.get(subdomain=subdomain)
except OpenId.DoesNotExist:
return False
if openid.trustedroot_set.filter(
trust_root = trust_root
).count() < 1:
# They don't trust this root yet
return False
# Add this to their history
OpenIdHistory.objects.create(openid=openid, trust_root=str(trust_root))
return True
|
More like this
- Template tag - list punctuation for a list of items by shapiromatron 8 months, 1 week ago
- JSONRequestMiddleware adds a .json() method to your HttpRequests by cdcarter 8 months, 2 weeks ago
- Serializer factory with Django Rest Framework by julio 1 year, 3 months ago
- Image compression before saving the new model / work with JPG, PNG by Schleidens 1 year, 4 months ago
- Help text hyperlinks by sa2812 1 year, 4 months ago
Comments
Please login first before commenting.