Coverage for django_napse/simulations/tasks/dataset_queue.py: 44%

25 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-03-12 13:49 +0000

1from django_napse.core.tasks import BaseTask 

2from django_napse.simulations.models import DataSetQueue 

3from django_napse.utils.errors import SimulationError 

4 

5 

6class DataSetQueueTask(BaseTask): 

7 name = "dataset_queue" 

8 interval_time = 5 

9 time_limit = 60 * 60 

10 soft_time_limit = 60 * 60 

11 

12 def run(self): 

13 """Run TASK. 

14 

15 Process all pending DataSetQueues periodically. 

16 """ 

17 if not self.avoid_overlap(verbose=True): 

18 return 

19 queue = DataSetQueue.objects.all().order_by("created_at").first() 

20 if queue is None: 

21 return 

22 try: 

23 queue.download() 

24 except TimeoutError: 

25 return 

26 if queue.is_finished(): 

27 queue.delete() 

28 else: 

29 dataset = queue.get_dataset() 

30 error_msg = f"DataSetQueueTask: {queue} status {dataset.status} completion {dataset.completion}" 

31 raise SimulationError.DataSetQueueError(error_msg) 

32 

33 

34DataSetQueueTask().delete_task() 

35DataSetQueueTask().register_task()