Lots of people have asked me when the django-openid package will provide tools for running an OpenID provider (in addition to an OpenID consumer). The answer is "when it's done", but since it's such a common request I've decided to post some example code to help people who want to work it out for themselves.
This is the openidserver.py file from idproxy.net. It is incomplete - the urlconf, models, templates and some utility functions are not included. In other words, it's useless for anything other than providing a few hints as to how you can go about implementing a provider.
Nonetheless, it's better than nothing. I hope this will prove a useful example for people trying to figure out how to best integrate the JanRain Python OpenID library with Django to build an OpenID provider.
| from django.http import HttpResponse, HttpResponseRedirect
from django.shortcuts import render_to_response
from models import *
OPENID_FILESTORE = '/tmp/openid-filestore'
from openid.store.filestore import FileOpenIDStore
from openid.server.server import Server
from idproxynet.idproxy.views import yhash_from_cookie
from idproxynet import nonceprotect
import datetime, pickle
def openid_server(req):
"""
This view is the actual OpenID server - running at the URL pointed to by
the <link rel="openid.server"> tag.
"""
server = Server(FileOpenIDStore(OPENID_FILESTORE))
# Clear AuthorizationInfo session var, if it is set
if req.session.get('AuthorizationInfo', None):
del req.session['AuthorizationInfo']
querydict = dict(req.REQUEST.items())
orequest = server.decodeRequest(querydict)
if not orequest:
return HttpResponse("This is an OpenID server")
if orequest.mode in ("checkid_immediate", "checkid_setup"):
if openid_is_authorized(req, orequest.identity,
orequest.trust_root):
oresponse = orequest.answer(True)
elif orequest.immediate:
oresponse = orequest.answer(False, 'http://idproxy.net/openid/server/')
else:
return openid_decide_page(req, orequest)
else:
oresponse = server.handleRequest(orequest)
webresponse = server.encodeResponse(oresponse)
return django_response(webresponse)
def django_response(webresponse):
"Convert a webresponse from the OpenID library in to a Django HttpResponse"
response = HttpResponse(webresponse.body)
response.status_code = webresponse.code
for key, value in webresponse.headers.items():
response[key] = value
return response
def pending_request(req, id):
"""
This page allows users to continue a pending request - i.e. one that was
stashed away because they weren't authenticated with the site.
"""
# Allows user to retry an OpenID response stashed in PendingRequest table
yhash = yhash_from_cookie(req)
if not yhash:
return error_page(req, "You are not signed in")
try:
yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
except YahooUser.DoesNotExist:
return error_page(req, "You are signed in as an invalid user")
try:
pendingrequest = PendingRequest.objects.get(
id = id,
openid__user = yahoouser,
)
except PendingRequest.DoesNotExist:
return error_page(req, "Pending request does not exist")
orequest = pendingrequest.unpickled()
# If they already trust the trust_root, log them straight in after deleting
# the pending request
if openid_is_authorized(req, orequest.identity, orequest.trust_root):
pendingrequest.delete()
oresponse = orequest.answer(True)
server = Server(FileOpenIDStore(OPENID_FILESTORE))
webresponse = server.encodeResponse(oresponse)
return django_response(webresponse)
# Show them the decide page
return openid_decide_page(req, orequest, pendingrequest.id)
def openid_decide_page(req, orequest, pending_id = None):
"""
The page that asks the user if they really want to sign in to the site, and
lets them add the consumer to their trusted whitelist.
"""
# If user is logged in, ask if they want to trust this trust_root
# If they are NOT logged in, show the landing page
yhash = yhash_from_cookie(req)
if not yhash:
return landing_page(req, orequest)
try:
yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
except YahooUser.DoesNotExist:
return error_page(req, "You are signed in as an invalid user")
# Check that the user owns the requested identity
subdomain = orequest.identity.replace('http://', '').replace(
'.idproxy.net/', '')
try:
yahoouser.openid_set.get(subdomain = subdomain)
except OpenId.DoesNotExist:
return error_page(req, "You are signed in to idproxy.net but do " + \
"not own the requested OpenID")
# They are logged in - ask if they want to trust this root
req.session['orequest'] = orequest
return render_to_response('decide.html', {
'title': 'Trust this site?',
'djnonce': nonceprotect.new_nonce_for_path_and_yhash('/decide/', yhash),
'trust_root': orequest.trust_root,
'identity': orequest.identity,
'yahoouser': yahoouser,
'pending_id': pending_id,
})
import pprint
from cgi import escape
return HttpResponse(
("This is the page where you decide on identity %s page %s" % (
orequest.identity, orequest.trust_root
)) + escape(pprint.pformat(orequest.__dict__.items()))
)
def error_page(req, msg):
return render_to_response('error.html', {
'title': 'Error',
'msg': msg,
})
def openid_decide_post(req):
"""
The user has submitted the openid_decide_page
"""
assert req.POST, "POST required for this page"
orequest = req.session.get('orequest', None)
assert orequest, "Should have orequest stashed in their session"
yhash = yhash_from_cookie(req)
if not yhash:
return error_page(req, "You are not signed in")
try:
yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
except YahooUser.DoesNotExist:
return error_page(req, "You are signed in as an invalid user")
try:
nonceprotect.validate_nonce_against_path_and_yhash(
req.POST.get('djnonce', ''), req.path, yhash
)
except nonceprotect.BadNonce:
return error_page(req, "Your form has expired; hit back and try again")
# Ensure that auth_user owns the requested domain
try:
subdomain = orequest.identity.replace(
'http://', '').replace('.idproxy.net/', '')
openid = yahoouser.openid_set.get(subdomain=subdomain)
except OpenId.DoesNotExist:
return error_page(req, "You do not own that OpenID")
# If there was an associated pending request, delete it
if req.POST.get('pending_id', None):
PendingRequest.objects.get(
id = req.POST.get('pending_id'),
openid = openid
).delete()
if req.POST.get('cancel', None):
# Cancel their request
del req.session['orequest']
return HttpResponseRedirect(orequest.getCancelURL())
if req.POST.get('always', None):
# Save this to their trust roots
openid.trustedroot_set.create(
trust_root = orequest.trust_root,
created = datetime.datetime.now()
)
# Redirect and say yes
oresponse = orequest.answer(True)
server = Server(FileOpenIDStore(OPENID_FILESTORE))
webresponse = server.encodeResponse(oresponse)
# Add this to their history
OpenIdHistory.objects.create(
openid = openid,
trust_root = orequest.trust_root
)
return django_response(webresponse)
def landing_page(req, orequest):
"""
The page shown when the user attempts to sign in somewhere using OpenID
but is not authenticated with the site. For idproxy.net, a message telling
them to log in manually is displayed.
"""
# stash details so when they have logged in they can complete the auth
subdomain = orequest.identity.replace('http://', '').replace(
'.idproxy.net/', '')
try:
openid = OpenId.objects.get(subdomain = subdomain)
except OpenId.DoesNotExist:
return error_page(req, 'That OpenID does not exist')
openid.pendingrequest_set.create(
trust_root = orequest.trust_root,
orequest = pickle.dumps(orequest)
)
return render_to_response('landing.html', {
'title': 'You need to sign in',
})
def openid_cancel(request):
"Called when the user cancels a pending OpenID auth request"
if request.session.get('AuthorizationInfo', None):
del request.session['AuthorizationInfo']
return HttpResponseRedirect(request.GET.get('n', '/'))
import urlparse
def openid_is_authorized(req, identity_url, trust_root):
"""
Check that they own the given identity URL, and that the trust_root is
in their whitelist of trusted sites.
"""
domain = urlparse.urlparse(identity_url)[1]
subdomain = domain.split('.')[0]
yhash = yhash_from_cookie(req)
if not yhash:
return False
try:
yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
except YahooUser.DoesNotExist:
return False
# Ensure that auth_user owns the requested domain
try:
openid = yahoouser.openid_set.get(subdomain=subdomain)
except OpenId.DoesNotExist:
return False
if openid.trustedroot_set.filter(
trust_root = trust_root
).count() < 1:
# They don't trust this root yet
return False
# Add this to their history
OpenIdHistory.objects.create(openid=openid, trust_root=str(trust_root))
return True
|
More like this
- Template tag - list punctuation for a list of items by shapiromatron 10 months, 1 week ago
- JSONRequestMiddleware adds a .json() method to your HttpRequests by cdcarter 10 months, 2 weeks ago
- Serializer factory with Django Rest Framework by julio 1 year, 5 months ago
- Image compression before saving the new model / work with JPG, PNG by Schleidens 1 year, 6 months ago
- Help text hyperlinks by sa2812 1 year, 6 months ago
Comments
Please login first before commenting.