Login

Partial OpenID provider implementation from idproxy.net

Author:
simon
Posted:
July 12, 2007
Language:
Python
Version:
.96
Score:
1 (after 1 ratings)

Lots of people have asked me when the django-openid package will provide tools for running an OpenID provider (in addition to an OpenID consumer). The answer is "when it's done", but since it's such a common request I've decided to post some example code to help people who want to work it out for themselves.

This is the openidserver.py file from idproxy.net. It is incomplete - the urlconf, models, templates and some utility functions are not included. In other words, it's useless for anything other than providing a few hints as to how you can go about implementing a provider.

Nonetheless, it's better than nothing. I hope this will prove a useful example for people trying to figure out how to best integrate the JanRain Python OpenID library with Django to build an OpenID provider.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
from django.http import HttpResponse, HttpResponseRedirect
from django.shortcuts import render_to_response
from models import *
OPENID_FILESTORE = '/tmp/openid-filestore'

from openid.store.filestore import FileOpenIDStore
from openid.server.server import Server

from idproxynet.idproxy.views import yhash_from_cookie
from idproxynet import nonceprotect
import datetime, pickle

def openid_server(req):
    """
    This view is the actual OpenID server - running at the URL pointed to by 
    the <link rel="openid.server"> tag. 
    """
    server = Server(FileOpenIDStore(OPENID_FILESTORE))
    # Clear AuthorizationInfo session var, if it is set
    if req.session.get('AuthorizationInfo', None):
        del req.session['AuthorizationInfo']
    querydict = dict(req.REQUEST.items())
    orequest = server.decodeRequest(querydict)
    if not orequest:
        return HttpResponse("This is an OpenID server")
    if orequest.mode in ("checkid_immediate", "checkid_setup"):
        if openid_is_authorized(req, orequest.identity,
            orequest.trust_root):
            oresponse = orequest.answer(True)
        elif orequest.immediate:
            oresponse = orequest.answer(False, 'http://idproxy.net/openid/server/')
        else:
            return openid_decide_page(req, orequest)
    else:
        oresponse = server.handleRequest(orequest)
    webresponse = server.encodeResponse(oresponse)
    return django_response(webresponse)

def django_response(webresponse):
    "Convert a webresponse from the OpenID library in to a Django HttpResponse"
    response = HttpResponse(webresponse.body)
    response.status_code = webresponse.code
    for key, value in webresponse.headers.items():
        response[key] = value
    return response

def pending_request(req, id):
    """
    This page allows users to continue a pending request - i.e. one that was 
    stashed away because they weren't authenticated with the site.
    """
    # Allows user to retry an OpenID response stashed in PendingRequest table
    yhash = yhash_from_cookie(req)
    if not yhash:
        return error_page(req, "You are not signed in")
    try:
        yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
    except YahooUser.DoesNotExist:
        return error_page(req, "You are signed in as an invalid user")
    try:
        pendingrequest = PendingRequest.objects.get(
            id = id,
            openid__user = yahoouser,
        )
    except PendingRequest.DoesNotExist:
        return error_page(req, "Pending request does not exist")
    orequest = pendingrequest.unpickled()
    # If they already trust the trust_root, log them straight in after deleting
    # the pending request
    if openid_is_authorized(req, orequest.identity, orequest.trust_root):
        pendingrequest.delete()
        oresponse = orequest.answer(True)
        server = Server(FileOpenIDStore(OPENID_FILESTORE))
        webresponse = server.encodeResponse(oresponse)
        return django_response(webresponse)
    # Show them the decide page
    return openid_decide_page(req, orequest, pendingrequest.id)

def openid_decide_page(req, orequest, pending_id = None):
    """
    The page that asks the user if they really want to sign in to the site, and
    lets them add the consumer to their trusted whitelist.
    """
    # If user is logged in, ask if they want to trust this trust_root
    # If they are NOT logged in, show the landing page
    yhash = yhash_from_cookie(req)
    if not yhash:
        return landing_page(req, orequest)
    try:
        yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
    except YahooUser.DoesNotExist:
        return error_page(req, "You are signed in as an invalid user")
    # Check that the user owns the requested identity
    subdomain = orequest.identity.replace('http://', '').replace(
        '.idproxy.net/', '')
    try:
        yahoouser.openid_set.get(subdomain = subdomain)
    except OpenId.DoesNotExist:
        return error_page(req, "You are signed in to idproxy.net but do " + \
            "not own the requested OpenID")
    # They are logged in - ask if they want to trust this root
    req.session['orequest'] = orequest
    return render_to_response('decide.html', {
        'title': 'Trust this site?',
        'djnonce': nonceprotect.new_nonce_for_path_and_yhash('/decide/', yhash),
        'trust_root': orequest.trust_root,
        'identity': orequest.identity,
        'yahoouser': yahoouser,
        'pending_id': pending_id,
    })
    import pprint
    from cgi import escape
    return HttpResponse(
        ("This is the page where you decide on identity %s page %s" % (
            orequest.identity, orequest.trust_root
        )) + escape(pprint.pformat(orequest.__dict__.items()))
    )

def error_page(req, msg):
    return render_to_response('error.html', {
        'title': 'Error',
        'msg': msg,
    })

def openid_decide_post(req):
    """
    The user has submitted the openid_decide_page
    """
    assert req.POST, "POST required for this page"
    orequest = req.session.get('orequest', None)
    assert orequest, "Should have orequest stashed in their session"
    yhash = yhash_from_cookie(req)
    if not yhash:
        return error_page(req, "You are not signed in")
    try:
        yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
    except YahooUser.DoesNotExist:
        return error_page(req, "You are signed in as an invalid user")
    try:
        nonceprotect.validate_nonce_against_path_and_yhash(
            req.POST.get('djnonce', ''), req.path, yhash
        )
    except nonceprotect.BadNonce:
        return error_page(req, "Your form has expired; hit back and try again")
    # Ensure that auth_user owns the requested domain
    try:
        subdomain = orequest.identity.replace(
            'http://', '').replace('.idproxy.net/', '')
        openid = yahoouser.openid_set.get(subdomain=subdomain)
    except OpenId.DoesNotExist:
        return error_page(req, "You do not own that OpenID")
    # If there was an associated pending request, delete it
    if req.POST.get('pending_id', None):
        PendingRequest.objects.get(
            id = req.POST.get('pending_id'),
            openid = openid
        ).delete()
    if req.POST.get('cancel', None):
        # Cancel their request
        del req.session['orequest']
        return HttpResponseRedirect(orequest.getCancelURL())
    if req.POST.get('always', None):
        # Save this to their trust roots
        openid.trustedroot_set.create(
            trust_root = orequest.trust_root,
            created = datetime.datetime.now()
        )
    # Redirect and say yes
    oresponse = orequest.answer(True)
    server = Server(FileOpenIDStore(OPENID_FILESTORE))
    webresponse = server.encodeResponse(oresponse)
    # Add this to their history
    OpenIdHistory.objects.create(
        openid = openid,
        trust_root = orequest.trust_root
    )
    return django_response(webresponse)

def landing_page(req, orequest):
    """
    The page shown when the user attempts to sign in somewhere using OpenID 
    but is not authenticated with the site. For idproxy.net, a message telling
    them to log in manually is displayed.
    """
    # stash details so when they have logged in they can complete the auth
    subdomain = orequest.identity.replace('http://', '').replace(
        '.idproxy.net/', '')
    try:
        openid = OpenId.objects.get(subdomain = subdomain)
    except OpenId.DoesNotExist:
        return error_page(req, 'That OpenID does not exist')
    openid.pendingrequest_set.create(
        trust_root = orequest.trust_root,
        orequest = pickle.dumps(orequest)
    )
    return render_to_response('landing.html', {
        'title': 'You need to sign in',
    })

def openid_cancel(request):
    "Called when the user cancels a pending OpenID auth request"
    if request.session.get('AuthorizationInfo', None):
        del request.session['AuthorizationInfo']
    return HttpResponseRedirect(request.GET.get('n', '/'))

import urlparse
def openid_is_authorized(req, identity_url, trust_root):
    """
    Check that they own the given identity URL, and that the trust_root is 
    in their whitelist of trusted sites.
    """
    domain = urlparse.urlparse(identity_url)[1]
    subdomain = domain.split('.')[0]
    yhash = yhash_from_cookie(req)
    if not yhash:
        return False
    try:
        yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
    except YahooUser.DoesNotExist:
        return False
    # Ensure that auth_user owns the requested domain
    try:
        openid = yahoouser.openid_set.get(subdomain=subdomain)
    except OpenId.DoesNotExist:
        return False
    if openid.trustedroot_set.filter(
        trust_root = trust_root
    ).count() < 1:
        # They don't trust this root yet
        return False
    # Add this to their history
    OpenIdHistory.objects.create(openid=openid, trust_root=str(trust_root))
    return True

More like this

  1. Template tag - list punctuation for a list of items by shapiromatron 10 months, 1 week ago
  2. JSONRequestMiddleware adds a .json() method to your HttpRequests by cdcarter 10 months, 2 weeks ago
  3. Serializer factory with Django Rest Framework by julio 1 year, 5 months ago
  4. Image compression before saving the new model / work with JPG, PNG by Schleidens 1 year, 6 months ago
  5. Help text hyperlinks by sa2812 1 year, 6 months ago

Comments

Please login first before commenting.