Coverage for django_napse/core/tasks/base_tasks.py: 62%

53 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-03-12 13:49 +0000

1import celery 

2from celery.utils.log import get_task_logger 

3from django.db import IntegrityError 

4from django.db.utils import ProgrammingError 

5from django_celery_beat.models import IntervalSchedule, PeriodicTask 

6 

7from django_napse.core.celery_app import celery_app, strategy_log_free 

8 

9 

10class BaseTask(celery.Task): 

11 name = "base_task" 

12 Strategy = strategy_log_free 

13 logger = get_task_logger(name) 

14 interval_time = 5 # Impossible to make dynamic modification because of celery 

15 

16 def run(self): 

17 pass 

18 

19 def create_task(self) -> None: 

20 """Build task feed_bots_with_candles. 

21 

22 Raises: 

23 ValidationError: if task already exist 

24 """ 

25 try: 

26 schedule = IntervalSchedule.objects.get(every=self.interval_time, period=IntervalSchedule.SECONDS) 

27 except ProgrammingError: 

28 return 

29 except IntervalSchedule.DoesNotExist: 

30 schedule = IntervalSchedule.objects.create(every=self.interval_time, period=IntervalSchedule.SECONDS) 

31 try: 

32 PeriodicTask.objects.create(interval=schedule, name=f"period_{self.name}", task=self.name) 

33 self.logger.info(f"Period task {self.name} created") 

34 except IntegrityError: 

35 pass 

36 

37 def delete_task(self) -> None: 

38 """Destroy task feed_bots_with_candles. 

39 

40 Raises: 

41 ValidationError: if task doesn't exist 

42 """ 

43 try: 

44 PeriodicTask.objects.get(task=self.name).delete() 

45 self.logger.info(f"Period task {self.name} deleted") 

46 except PeriodicTask.DoesNotExist: 

47 pass 

48 except ProgrammingError: 

49 pass 

50 

51 def register_task(self): 

52 celery_app.register_task(self) 

53 self.create_task() 

54 self.logger.info(f"Period task {self.name} registered") 

55 

56 def active_tasks(self): 

57 return celery_app.control.inspect().active() 

58 

59 def num_active_tasks(self): 

60 active_tasks = self.active_tasks() 

61 count = 0 

62 for worker in active_tasks: 

63 for task in active_tasks[worker]: 

64 if task["name"] == self.name: 

65 count += 1 

66 return count 

67 

68 def avoid_overlap(self, verbose=False): 

69 if self.num_active_tasks() > 1: 

70 if verbose: 

71 self.logger.info(f"Period task {self.name} already running") 

72 return False 

73 return True