Coverage for django_napse/core/tasks/candle_collector.py: 18%
83 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-12 13:49 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-12 13:49 +0000
1import requests
2from django.core.exceptions import ValidationError
4from django_napse.core.models.bots.controller import Controller
5from django_napse.core.tasks.base_tasks import BaseTask
8class CandleCollectorTask(BaseTask):
9 name = "candle_collector"
10 interval_time = 30
12 @staticmethod
13 def build_candle(request_json: list[list[int], list[int]]) -> tuple[dict[str, int | float], dict[str, int | float]]:
14 """Structure close_candle & current candle from the request.json().
16 Candle shape: {"T": 1623000000000, "O": 1.0, "H": 1.0, "L": 1.0, "C": 1.0, "V": 1.0}.
18 Args:
19 ----
20 request_json: response of binance's api
22 Returns:
23 -------
24 closed_candle (dict[str, int | float]): last closed candle
25 current_candle (dict[str, int | float]): current candle
27 Raises:
28 ------
29 ValidationError: if request_json is not a list of 2 lists
30 """
31 dico_structure_label = ["T", "O", "H", "L", "C", "V"]
33 # Check request
34 if not isinstance(request_json, list):
35 error_msg = f"request_json is not a list ({type(request_json)})"
36 raise ValidationError(error_msg)
38 if len(request_json) != 2:
39 error_msg = f"request_json must have 2 elements ({len(request_json)})"
40 raise ValidationError(error_msg)
42 if not isinstance(request_json[0], list) or not isinstance(request_json[1], list):
43 error_msg = f"request_json is not a list of 2 lists ({type(request_json[0])}, {type(request_json[1])})"
44 raise ValidationError(error_msg)
46 if len(request_json[0]) != len(request_json[1]):
47 error_msg = f"The 2 lists of requests json must have the same length ({len(request_json[0])}, {len(request_json[1])})"
48 raise ValidationError(error_msg)
50 if len(request_json[0]) < len(dico_structure_label) or len(request_json[1]) < len(dico_structure_label):
51 error_msg = f"Request_json's lists are too short ({len(request_json[0])}, {len(request_json[1])})"
52 raise ValidationError(error_msg)
54 # Build candle with the good shape
55 closed_candle: dict[str, int | float] = {}
56 current_candle: dict[str, int | float] = {}
57 for i, label in enumerate(dico_structure_label):
58 if isinstance(request_json[0][i], int):
59 closed_candle[label] = request_json[0][i]
60 current_candle[label] = request_json[1][i]
61 else:
62 closed_candle[label] = float(request_json[0][i])
63 current_candle[label] = float(request_json[1][i])
64 return closed_candle, current_candle
66 @staticmethod
67 def request_get(pair: str, interval: str, api: str = "api") -> requests.Response:
68 """Make a request to binance's api to get 2 candles (the last closed and the current one).
70 Args:
71 ----
72 pair (str): pair of the candles
73 interval (str): interval of the candles
74 api (str, optional): api to use
76 Returns:
77 -------
78 response of the request (requests.Response)
79 """
80 url = f"https://{api}.binance.com/api/v3/klines?symbol={pair}&interval={interval}&limit=2"
81 try:
82 req = requests.get(url, timeout=10)
83 except requests.exceptions.ConnectionError:
84 req = None
85 return req
87 def get_candles(self, pair: str, interval: str) -> tuple[dict[str, int | float], dict[str, int | float]]:
88 """Get candles from binance's api.
90 Retry automatically on all binance's backup api if the request failed.
92 Args:
93 ----
94 pair: pair of the candles
95 interval: interval of the candles
97 Returns:
98 -------
99 closed_candle, current_candle
101 Raises:
102 ------
103 ValidationError: if request failed on all apis
104 """
105 apis = ("api", "api1", "api2", "api3")
107 for api in apis:
108 request = self.request_get(pair, interval, api=api)
109 # Valid request
110 if request.status_code == 200:
111 return self.build_candle(request.json())
112 # All requests failed
113 error_msg = f"Impossible to get candles from binance's api (pair: {pair}, interval: {interval})"
114 raise ValueError(error_msg)
116 def run(self) -> None:
117 """Run the task.
119 Try to get the results of request of binance's api and send it to controller(s).
120 If the request failed, the controller(s) is add to a list and controller(s) is this list try again (on all binance's backup api) at the end.
121 """
122 print("CandleCollectorTask")
123 if not self.avoid_overlap(verbose=True):
124 print("skipped")
125 return
127 failed_controllers = []
128 failed_controllers_second_attempt = []
129 all_orders = []
130 for controller in Controller.objects.all():
131 request = self.request_get(controller.pair, controller.interval, "api")
132 if request is None or request.status_code != 200:
133 self.logger.warn(f"Controller {controller.pk} failed on 'api'")
134 failed_controllers.append(controller)
135 continue
136 closed_candle, current_candle = self.build_candle(request.json())
137 all_orders += controller.send_candles_to_bots(closed_candle, current_candle)
139 backup_apis = ("api1", "api2", "api3")
140 for controller in failed_controllers:
141 success = False
142 for api in backup_apis:
143 request = self.request_get(controller.pair, controller.interval, api)
144 if request is not None and request.status_code == 200:
145 closed_candle, current_candle = self.build_candle(request.json())
146 self.logger.info(f"{controller} succeeded on '{api}'")
147 success = True
148 break
149 if success:
150 all_orders += controller.send_candles_to_bots(closed_candle, current_candle)
151 else:
152 failed_controllers_second_attempt.append(controller)
154 for controller in failed_controllers_second_attempt:
155 self.logger.error(f"{controller} failed on all apis")
158CandleCollectorTask().delete_task()
159CandleCollectorTask().register_task()