- Author:
- zbyte64
- Posted:
- August 1, 2011
- Language:
- Python
- Version:
- Not specified
- Score:
- 1 (after 1 ratings)
TaskViewMixin can be mixed in with a Class Based view and handles the scheduling and polling of a task. During task execution, a waiting page is rendered that should refresh itself. Once the task is complete, the view may then render a success page and can collect the payload of the task for rendering the result.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | from django.views.generic import TemplateView
from celery.result import AsyncResult
from celery.registry import tasks
class BaseResponseView(TemplateView):
result = None
def get_context_data(self, **kwargs):
context = super(BaseResponseView, self).get_context_data(**kwargs)
context['result'] = self.result
return context
class WaitForResponseView(BaseResponseView):
template_name = 'tasks/wait_for_response.html'
class TaskFailedResponseView(BaseResponseView):
template_name = 'tasks/task_failed_response.html'
class TaskViewMixin(object):
session_key = 'asynctask_token'
wait_for_response_view = WaitForResponseView
task_failed_response_view = TaskFailedResponseView
task_name = None #please define this
def get_task_kwargs(self):
return {}
def schedule_task(self):
task = tasks[self.task_name]
kwargs = self.get_task_kwargs()
result = task.apply_async(kwargs=kwargs)
self.set_task_token(result.task_id)
return result
def get_task_token(self):
return self.request.session.get(self.session_key, None)
def set_task_token(self, task_id):
self.request.session[self.session_key] = task_id
def clear_task_token(self):
del self.request.session[self.session_key]
def task_status(self):
task_id = self.get_task_token()
if task_id is None:
return None
return AsyncResult(task_id)
def get(self, request, **kwargs):
result = self.task_status()
if result is None or request.REQUEST.get('recreate', False):
result = self.schedule_task()
if result.failed():
self.clear_task_token()
return self.render_task_failed_response(result)
elif result.successful():
self.clear_task_token()
return self.render_task_success_response(result)
else:
return self.render_wait_for_response(result)
def render_wait_for_response(self, result):
return self.wait_for_response_view(result=result).dispatch(self.request)
def render_task_failed_response(self, result):
return self.task_failed_response_view(result=result).dispatch(self.request)
def render_task_success_response(self, result):
return self.render_to_response({'result':result})
class ExampleTaskView(TaskViewMixin, TemplateView):
template_name = 'tasks/task_success.html'
task_name = 'celery.ping'
example_view = ExampleTaskView.as_view()
|
More like this
- Template tag - list punctuation for a list of items by shapiromatron 11 months, 2 weeks ago
- JSONRequestMiddleware adds a .json() method to your HttpRequests by cdcarter 11 months, 3 weeks ago
- Serializer factory with Django Rest Framework by julio 1 year, 6 months ago
- Image compression before saving the new model / work with JPG, PNG by Schleidens 1 year, 7 months ago
- Help text hyperlinks by sa2812 1 year, 8 months ago
Comments
Please login first before commenting.