Coverage for django_napse/core/tasks/base_tasks.py: 62%
53 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-12 13:49 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-12 13:49 +0000
1import celery
2from celery.utils.log import get_task_logger
3from django.db import IntegrityError
4from django.db.utils import ProgrammingError
5from django_celery_beat.models import IntervalSchedule, PeriodicTask
7from django_napse.core.celery_app import celery_app, strategy_log_free
10class BaseTask(celery.Task):
11 name = "base_task"
12 Strategy = strategy_log_free
13 logger = get_task_logger(name)
14 interval_time = 5 # Impossible to make dynamic modification because of celery
16 def run(self):
17 pass
19 def create_task(self) -> None:
20 """Build task feed_bots_with_candles.
22 Raises:
23 ValidationError: if task already exist
24 """
25 try:
26 schedule = IntervalSchedule.objects.get(every=self.interval_time, period=IntervalSchedule.SECONDS)
27 except ProgrammingError:
28 return
29 except IntervalSchedule.DoesNotExist:
30 schedule = IntervalSchedule.objects.create(every=self.interval_time, period=IntervalSchedule.SECONDS)
31 try:
32 PeriodicTask.objects.create(interval=schedule, name=f"period_{self.name}", task=self.name)
33 self.logger.info(f"Period task {self.name} created")
34 except IntegrityError:
35 pass
37 def delete_task(self) -> None:
38 """Destroy task feed_bots_with_candles.
40 Raises:
41 ValidationError: if task doesn't exist
42 """
43 try:
44 PeriodicTask.objects.get(task=self.name).delete()
45 self.logger.info(f"Period task {self.name} deleted")
46 except PeriodicTask.DoesNotExist:
47 pass
48 except ProgrammingError:
49 pass
51 def register_task(self):
52 celery_app.register_task(self)
53 self.create_task()
54 self.logger.info(f"Period task {self.name} registered")
56 def active_tasks(self):
57 return celery_app.control.inspect().active()
59 def num_active_tasks(self):
60 active_tasks = self.active_tasks()
61 count = 0
62 for worker in active_tasks:
63 for task in active_tasks[worker]:
64 if task["name"] == self.name:
65 count += 1
66 return count
68 def avoid_overlap(self, verbose=False):
69 if self.num_active_tasks() > 1:
70 if verbose:
71 self.logger.info(f"Period task {self.name} already running")
72 return False
73 return True