Skip to content
Snippets Groups Projects
Verified Commit 4ac097e9 authored by Nik | Klampfradler's avatar Nik | Klampfradler
Browse files

Ensure task is enqueued on commit

parent d994a221
No related branches found
No related tags found
No related merge requests found
...@@ -55,11 +55,4 @@ class CeleryHaystack(AppConf): ...@@ -55,11 +55,4 @@ class CeleryHaystack(AppConf):
return data return data
signal_processor = getattr(settings, 'HAYSTACK_SIGNAL_PROCESSOR', None) signal_processor = getattr(settings, 'HAYSTACK_SIGNAL_PROCESSOR', "celery_haystack.signals.CelerySignalProcessor")
if signal_processor is None:
raise ImproperlyConfigured("When using celery-haystack with Haystack 2.X "
"the HAYSTACK_SIGNAL_PROCESSOR setting must be "
"set. Use 'celery_haystack.signals."
"CelerySignalProcessor' as default.")
from django.db import transaction
from django.db.models import signals from django.db.models import signals
from haystack.signals import BaseSignalProcessor from haystack.signals import RealtimeSignalProcessor
from haystack.exceptions import NotHandled from haystack.exceptions import NotHandled
from haystack.utils import get_identifier from haystack.utils import get_identifier
from .utils import enqueue_task from .conf import settings
from .utils import get_update_task
from .indexes import CelerySearchIndex from .indexes import CelerySearchIndex
class CelerySignalProcessor(BaseSignalProcessor): class CelerySignalProcessor(RealtimeSignalProcessor):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._queue = [] self._queue = []
def setup(self): def setup(self):
signals.post_save.connect(self.enqueue_save) transaction.on_commit(self.run_task)
signals.post_delete.connect(self.enqueue_delete) super().setup()
def teardown(self): def handle_save(self, sender, instance, **kwargs):
signals.post_save.disconnect(self.enqueue_save)
signals.post_delete.disconnect(self.enqueue_delete)
enqueue_task(self._queue)
def enqueue_save(self, sender, instance, **kwargs):
return self.enqueue('update', instance, sender, **kwargs) return self.enqueue('update', instance, sender, **kwargs)
def enqueue_delete(self, sender, instance, **kwargs): def handle_delete(self, sender, instance, **kwargs):
return self.enqueue('delete', instance, sender, **kwargs) return self.enqueue('delete', instance, sender, **kwargs)
def run_task(self):
options = {}
if settings.CELERY_HAYSTACK_QUEUE:
options['queue'] = settings.CELERY_HAYSTACK_QUEUE
if settings.CELERY_HAYSTACK_COUNTDOWN:
options['countdown'] = settings.CELERY_HAYSTACK_COUNTDOWN
task = get_update_task()
task.apply_async((self._queue,), {}, **options)
def enqueue(self, action, instance, sender, **kwargs): def enqueue(self, action, instance, sender, **kwargs):
""" """
Given an individual model instance, determine if a backend Given an individual model instance, determine if a backend
......
...@@ -3,7 +3,6 @@ try: ...@@ -3,7 +3,6 @@ try:
from importlib import import_module from importlib import import_module
except ImportError: except ImportError:
from django.utils.importlib import import_module from django.utils.importlib import import_module
from django.db import connection, transaction
from .conf import settings from .conf import settings
...@@ -22,31 +21,3 @@ def get_update_task(task_path=None): ...@@ -22,31 +21,3 @@ def get_update_task(task_path=None):
raise ImproperlyConfigured('Module "%s" does not define a "%s" ' raise ImproperlyConfigured('Module "%s" does not define a "%s" '
'class.' % (module, attr)) 'class.' % (module, attr))
return task return task
def enqueue_task(queue, **kwargs):
"""
Common utility for enqueing a task for the given action and
model instance.
"""
options = {}
if settings.CELERY_HAYSTACK_QUEUE:
options['queue'] = settings.CELERY_HAYSTACK_QUEUE
if settings.CELERY_HAYSTACK_COUNTDOWN:
options['countdown'] = settings.CELERY_HAYSTACK_COUNTDOWN
task = get_update_task()
task_func = lambda: task.apply_async((queue,), kwargs, **options)
if hasattr(transaction, 'on_commit'):
# Django 1.9 on_commit hook
transaction.on_commit(
task_func
)
elif hasattr(connection, 'on_commit'):
# Django-transaction-hooks
connection.on_commit(
task_func
)
else:
task_func()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment