Coverage for django_napse/core/tasks/candle_collector.py: 18%

83 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-03-12 13:49 +0000

1import requests 

2from django.core.exceptions import ValidationError 

3 

4from django_napse.core.models.bots.controller import Controller 

5from django_napse.core.tasks.base_tasks import BaseTask 

6 

7 

8class CandleCollectorTask(BaseTask): 

9 name = "candle_collector" 

10 interval_time = 30 

11 

12 @staticmethod 

13 def build_candle(request_json: list[list[int], list[int]]) -> tuple[dict[str, int | float], dict[str, int | float]]: 

14 """Structure close_candle & current candle from the request.json(). 

15 

16 Candle shape: {"T": 1623000000000, "O": 1.0, "H": 1.0, "L": 1.0, "C": 1.0, "V": 1.0}. 

17 

18 Args: 

19 ---- 

20 request_json: response of binance's api 

21 

22 Returns: 

23 ------- 

24 closed_candle (dict[str, int | float]): last closed candle 

25 current_candle (dict[str, int | float]): current candle 

26 

27 Raises: 

28 ------ 

29 ValidationError: if request_json is not a list of 2 lists 

30 """ 

31 dico_structure_label = ["T", "O", "H", "L", "C", "V"] 

32 

33 # Check request 

34 if not isinstance(request_json, list): 

35 error_msg = f"request_json is not a list ({type(request_json)})" 

36 raise ValidationError(error_msg) 

37 

38 if len(request_json) != 2: 

39 error_msg = f"request_json must have 2 elements ({len(request_json)})" 

40 raise ValidationError(error_msg) 

41 

42 if not isinstance(request_json[0], list) or not isinstance(request_json[1], list): 

43 error_msg = f"request_json is not a list of 2 lists ({type(request_json[0])}, {type(request_json[1])})" 

44 raise ValidationError(error_msg) 

45 

46 if len(request_json[0]) != len(request_json[1]): 

47 error_msg = f"The 2 lists of requests json must have the same length ({len(request_json[0])}, {len(request_json[1])})" 

48 raise ValidationError(error_msg) 

49 

50 if len(request_json[0]) < len(dico_structure_label) or len(request_json[1]) < len(dico_structure_label): 

51 error_msg = f"Request_json's lists are too short ({len(request_json[0])}, {len(request_json[1])})" 

52 raise ValidationError(error_msg) 

53 

54 # Build candle with the good shape 

55 closed_candle: dict[str, int | float] = {} 

56 current_candle: dict[str, int | float] = {} 

57 for i, label in enumerate(dico_structure_label): 

58 if isinstance(request_json[0][i], int): 

59 closed_candle[label] = request_json[0][i] 

60 current_candle[label] = request_json[1][i] 

61 else: 

62 closed_candle[label] = float(request_json[0][i]) 

63 current_candle[label] = float(request_json[1][i]) 

64 return closed_candle, current_candle 

65 

66 @staticmethod 

67 def request_get(pair: str, interval: str, api: str = "api") -> requests.Response: 

68 """Make a request to binance's api to get 2 candles (the last closed and the current one). 

69 

70 Args: 

71 ---- 

72 pair (str): pair of the candles 

73 interval (str): interval of the candles 

74 api (str, optional): api to use 

75 

76 Returns: 

77 ------- 

78 response of the request (requests.Response) 

79 """ 

80 url = f"https://{api}.binance.com/api/v3/klines?symbol={pair}&interval={interval}&limit=2" 

81 try: 

82 req = requests.get(url, timeout=10) 

83 except requests.exceptions.ConnectionError: 

84 req = None 

85 return req 

86 

87 def get_candles(self, pair: str, interval: str) -> tuple[dict[str, int | float], dict[str, int | float]]: 

88 """Get candles from binance's api. 

89 

90 Retry automatically on all binance's backup api if the request failed. 

91 

92 Args: 

93 ---- 

94 pair: pair of the candles 

95 interval: interval of the candles 

96 

97 Returns: 

98 ------- 

99 closed_candle, current_candle 

100 

101 Raises: 

102 ------ 

103 ValidationError: if request failed on all apis 

104 """ 

105 apis = ("api", "api1", "api2", "api3") 

106 

107 for api in apis: 

108 request = self.request_get(pair, interval, api=api) 

109 # Valid request 

110 if request.status_code == 200: 

111 return self.build_candle(request.json()) 

112 # All requests failed 

113 error_msg = f"Impossible to get candles from binance's api (pair: {pair}, interval: {interval})" 

114 raise ValueError(error_msg) 

115 

116 def run(self) -> None: 

117 """Run the task. 

118 

119 Try to get the results of request of binance's api and send it to controller(s). 

120 If the request failed, the controller(s) is add to a list and controller(s) is this list try again (on all binance's backup api) at the end. 

121 """ 

122 print("CandleCollectorTask") 

123 if not self.avoid_overlap(verbose=True): 

124 print("skipped") 

125 return 

126 

127 failed_controllers = [] 

128 failed_controllers_second_attempt = [] 

129 all_orders = [] 

130 for controller in Controller.objects.all(): 

131 request = self.request_get(controller.pair, controller.interval, "api") 

132 if request is None or request.status_code != 200: 

133 self.logger.warn(f"Controller {controller.pk} failed on 'api'") 

134 failed_controllers.append(controller) 

135 continue 

136 closed_candle, current_candle = self.build_candle(request.json()) 

137 all_orders += controller.send_candles_to_bots(closed_candle, current_candle) 

138 

139 backup_apis = ("api1", "api2", "api3") 

140 for controller in failed_controllers: 

141 success = False 

142 for api in backup_apis: 

143 request = self.request_get(controller.pair, controller.interval, api) 

144 if request is not None and request.status_code == 200: 

145 closed_candle, current_candle = self.build_candle(request.json()) 

146 self.logger.info(f"{controller} succeeded on '{api}'") 

147 success = True 

148 break 

149 if success: 

150 all_orders += controller.send_candles_to_bots(closed_candle, current_candle) 

151 else: 

152 failed_controllers_second_attempt.append(controller) 

153 

154 for controller in failed_controllers_second_attempt: 

155 self.logger.error(f"{controller} failed on all apis") 

156 

157 

158CandleCollectorTask().delete_task() 

159CandleCollectorTask().register_task()