Coverage for django_napse/simulations/models/simulations/simulation_queue.py: 96%

249 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-03-12 13:49 +0000

1import time 

2import uuid 

3from contextlib import suppress 

4from copy import deepcopy 

5from datetime import datetime, timedelta 

6 

7from django.db import models 

8 

9from django_napse.core.models.bots.bot import Bot 

10from django_napse.core.models.bots.controller import Controller 

11from django_napse.core.models.modifications import ArchitectureModification, ConnectionModification, StrategyModification 

12from django_napse.core.models.orders.order import Order, OrderBatch 

13from django_napse.core.models.transactions.credit import Credit 

14from django_napse.simulations.models.datasets.dataset import Candle, DataSet 

15from django_napse.simulations.models.simulations.managers import SimulationQueueManager 

16from django_napse.utils.constants import EXCHANGE_INTERVALS, ORDER_LEEWAY_PERCENTAGE, ORDER_STATUS, SIDES, SIMULATION_STATUS 

17from django_napse.utils.errors import ControllerError 

18 

19from .simulation import Simulation 

20 

21 

22class SimulationQueue(models.Model): 

23 """Queue wrapper for a simulation.""" 

24 

25 simulation_reference = models.UUIDField(unique=True, editable=False, default=uuid.uuid4) 

26 space = models.ForeignKey("django_napse_core.NapseSpace", on_delete=models.CASCADE, null=True) 

27 bot = models.OneToOneField("django_napse_core.Bot", on_delete=models.CASCADE, null=True) 

28 

29 start_date = models.DateTimeField() 

30 end_date = models.DateTimeField() 

31 canceled = models.BooleanField(default=False) 

32 

33 status = models.CharField(max_length=12, default=SIMULATION_STATUS.IDLE) 

34 completion = models.FloatField(default=0.0) 

35 eta = models.DurationField(blank=True, null=True) 

36 

37 error = models.BooleanField(default=False) 

38 

39 created_at = models.DateTimeField(auto_now_add=True) 

40 

41 objects = SimulationQueueManager() 

42 

43 def __str__(self) -> str: 

44 return f"BOT SIM QUEUE {self.pk}" 

45 

46 def info(self, verbose: bool = True, beacon: str = "") -> str: # noqa: FBT002, FBT001 

47 """Return info on SimulationQueue.""" 

48 string = "" 

49 string += f"{beacon}SimulationQueue {self.pk}:\n" 

50 string += f"{beacon}\t{self.bot=}\n" 

51 string += f"{beacon}\t{self.space=}\n" 

52 

53 string += f"{beacon}\t{self.start_date=}\n" 

54 string += f"{beacon}\t{self.end_date=}\n" 

55 string += f"{beacon}\t{self.created_at=}\n" 

56 

57 string += f"{beacon}Investments:\n" 

58 investments = self.investments.all() 

59 if investments.count() > 0: 

60 for investment in investments: 

61 string += f"{beacon}\t{investment}\n" 

62 else: 

63 string += f"{beacon}\tNo investments\n" 

64 if verbose: 

65 print(string) 

66 return string 

67 

68 def setup_simulation(self) -> tuple[Bot, dict[str, any]]: 

69 """Setup investment and connections for the simulation.""" 

70 self.space.simulation_wallet.find().reset() 

71 for investment in self.investments.all(): 

72 Credit.objects.create( 

73 wallet=self.space.simulation_wallet, 

74 ticker=investment.ticker, 

75 amount=investment.amount, 

76 ) 

77 new_bot = self.bot.copy() 

78 connection = new_bot.connect_to_wallet(self.space.simulation_wallet) 

79 for investment in self.investments.all(): 

80 connection.deposit(investment.ticker, investment.amount) 

81 no_db_data = new_bot.architecture.prepare_db_data() 

82 return new_bot, no_db_data 

83 

84 def cleanup_simulation(self, bot: Bot) -> None: 

85 """Reset the simulation: reset wallet & deactivate the bot.""" 

86 self.space.simulation_wallet.reset() 

87 bot.hibernate() 

88 

89 def preparation(self, bot, no_db_data): 

90 intervals = {controller.interval for controller in bot.controllers.values()} 

91 for interval in EXCHANGE_INTERVALS[next(iter(bot.controllers.values())).exchange_account.exchange.name]: 

92 if interval in intervals: 

93 min_interval = interval 

94 break 

95 

96 currencies = next(iter(no_db_data["connection_data"].values()))["wallet"]["currencies"] 

97 

98 exchange_controllers = {controller: controller.exchange_controller for controller in bot.controllers.values()} 

99 

100 data = {} 

101 datasets = [] 

102 for controller in bot.controllers.values(): 

103 datasets.append(DataSet.objects.create(controller=controller, start_date=self.start_date, end_date=self.end_date)) 

104 with suppress(ControllerError.InvalidSetting): 

105 datasets.append( 

106 DataSet.objects.create( 

107 controller=Controller.get( 

108 exchange_account=controller.exchange_account, 

109 base=controller.base, 

110 quote="USDT", 

111 interval=min_interval, 

112 ), 

113 start_date=self.start_date, 

114 end_date=self.end_date, 

115 ), 

116 ) 

117 with suppress(ControllerError.InvalidSetting): 

118 datasets.append( 

119 DataSet.objects.create( 

120 controller=Controller.get( 

121 exchange_account=controller.exchange_account, 

122 base=controller.quote, 

123 quote="USDT", 

124 interval=min_interval, 

125 ), 

126 start_date=self.start_date, 

127 end_date=self.end_date, 

128 ), 

129 ) 

130 datasets = set(datasets) 

131 candles = Candle.objects.filter(dataset__in=datasets).order_by("open_time") 

132 open_times = candles.values_list("open_time", flat=True).distinct() 

133 for open_time in open_times: 

134 data[open_time] = {} 

135 for controller in bot.controllers.values(): 

136 data[open_time][controller] = None 

137 for candle in candles: 

138 candle_dict = candle.to_dict() 

139 controller = candle_dict.pop("controller") 

140 data[candle.open_time][controller] = candle_dict 

141 

142 for controller in bot.controllers.values(): 

143 last_candle = None 

144 for open_time in open_times: 

145 if data[open_time][controller] is None: 

146 data[open_time][controller] = last_candle 

147 else: 

148 last_candle = data[open_time][controller] 

149 return data, currencies, exchange_controllers, min_interval 

150 

151 def process_candle_data(self, candle_data, min_interval): 

152 processed_data = {"candles": {}, "extras": {}} 

153 current_prices = {} 

154 for controller, candle in candle_data.items(): 

155 processed_data["candles"][controller] = {"current": candle, "latest": candle} 

156 if controller.quote == "USDT" and controller.interval == min_interval: 

157 price = candle["close"] 

158 current_prices[f"{controller.base}_price"] = price 

159 return processed_data, current_prices 

160 

161 def append_data( 

162 self, 

163 connection_specific_args: dict, 

164 candle_data: dict, 

165 current_prices: dict, 

166 currencies: dict, 

167 currencies_before: dict, 

168 all_orders: list, 

169 date: datetime, 

170 dates: list, 

171 values: list, 

172 actions: list, 

173 prices: dict, 

174 total_amounts: dict, 

175 amounts: list, 

176 tickers: list, 

177 taxes: list, 

178 extras: dict, 

179 ): 

180 current_amounts = {} 

181 for controller in candle_data: 

182 current_amounts[f"{controller.base}_amount"] = currencies.get(controller.base, {"amount": 0})["amount"] 

183 current_amounts[f"{controller.quote}_amount"] = currencies.get(controller.quote, {"amount": 0})["amount"] 

184 

185 wallet_value = 0 

186 for ticker, currency in currencies.items(): 

187 amount = currency["amount"] 

188 price = 1 if ticker == "USDT" else current_prices[f"{ticker}_price"] 

189 wallet_value += amount * price 

190 

191 wallet_value_before = 0 

192 for ticker, currency in currencies_before.items(): 

193 amount = currency["amount"] 

194 price = 1 if ticker == "USDT" else current_prices[f"{ticker}_price"] 

195 wallet_value_before += amount * price 

196 

197 for index, order in enumerate(all_orders): 

198 dates.append(date + index * timedelta(seconds=1)) 

199 values.append(round(wallet_value, 5)) 

200 actions.append(order.side) 

201 for ticker_price in current_prices: 

202 prices[ticker_price] = [*prices.get(ticker_price, []), 1 if ticker_price == "USDT" else round(current_prices[ticker_price], 5)] 

203 for ticker_amount in current_amounts: 

204 total_amounts[ticker_amount] = [*total_amounts.get(ticker_amount, []), round(current_amounts[ticker_amount], 5)] 

205 taxes.append(round(order.fees * (current_prices[f"{order.fee_ticker}_price"] if order.fee_ticker != "USDT" else 1), 5)) 

206 amounts.append(round(order.asked_for_amount, 5)) 

207 tickers.append(order.asked_for_ticker) 

208 for plugin in extras: 

209 extras[plugin] = [*extras[plugin], connection_specific_args[plugin].get_value()] 

210 

211 def quick_simulation(self, bot, no_db_data, verbose=True): 

212 data, currencies, exchange_controllers, min_interval = self.preparation(bot, no_db_data) 

213 _time = time.time() 

214 tpi = [] 

215 dates = [] 

216 values = [] 

217 actions = [] 

218 prices = {} 

219 total_amounts = {} 

220 taxes = [] 

221 amounts = [] 

222 tickers = [] 

223 extras = {csa.key: [] for csa in next(iter(no_db_data["connection_data"].values()))["connection_specific_args"].values()} 

224 for date, candle_data in data.items(): 

225 currencies_before = deepcopy(currencies) 

226 processed_data, current_prices = self.process_candle_data( 

227 candle_data=candle_data, 

228 min_interval=min_interval, 

229 ) 

230 orders = bot._get_orders(data=processed_data, no_db_data=no_db_data) 

231 batches = {} 

232 for order in orders: 

233 debited_amount = order["asked_for_amount"] * (1 + ORDER_LEEWAY_PERCENTAGE / 100) 

234 if debited_amount > 0: 

235 currencies[order["asked_for_ticker"]]["amount"] -= debited_amount 

236 order["debited_amount"] = debited_amount 

237 

238 controller = order["controller"] 

239 batches[controller] = OrderBatch(controller=controller) 

240 

241 for batch in batches.values(): 

242 batch.status = ORDER_STATUS.READY 

243 

244 all_orders = [] 

245 for controller, batch in batches.items(): 

246 all_modifications = [] 

247 controller_orders = [order for order in orders if order["controller"] == controller] 

248 order_objects = [] 

249 for order in controller_orders: 

250 order.pop("controller") 

251 strategy_modifications = order.pop("StrategyModifications") 

252 connection_modifications = order.pop("ConnectionModifications") 

253 architecture_modifications = order.pop("ArchitectureModifications") 

254 order = Order(batch=batch, **order) 

255 order_objects.append(order) 

256 for modification in strategy_modifications: 

257 all_modifications.append(StrategyModification(order=order, **modification)) 

258 for modification in connection_modifications: 

259 all_modifications.append(ConnectionModification(order=order, **modification)) 

260 for modification in architecture_modifications: 

261 all_modifications.append(ArchitectureModification(order=order, **modification)) 

262 

263 orders = controller.process_orders( 

264 no_db_data={ 

265 "buy_orders": [order for order in order_objects if order.side == SIDES.BUY], 

266 "sell_orders": [order for order in order_objects if order.side == SIDES.SELL], 

267 "keep_orders": [order for order in order_objects if order.side == SIDES.KEEP], 

268 "batches": [batch], 

269 "exchange_controller": exchange_controllers[controller], 

270 "min_trade": controller.min_notional / processed_data["candles"][controller]["latest"]["close"], 

271 "price": processed_data["candles"][controller]["latest"]["close"], 

272 }, 

273 testing=True, 

274 ) 

275 for order in orders: 

276 order._apply_modifications( 

277 batch=batch, 

278 modifications=[modification for modification in all_modifications if modification.order == order], 

279 strategy=no_db_data["strategy"], 

280 architecture=no_db_data["architecture"], 

281 currencies=currencies, 

282 ) 

283 

284 currencies[controller.base] = currencies.get(controller.base, {"amount": 0, "mbp": 0}) 

285 currencies[controller.quote] = currencies.get(controller.quote, {"amount": 0, "mbp": 0}) 

286 currencies[controller.base]["amount"] += order.exit_amount_base 

287 currencies[controller.quote]["amount"] += order.exit_amount_quote 

288 

289 all_orders += orders 

290 

291 self.append_data( 

292 connection_specific_args=next(iter(no_db_data["connection_data"].values()))["connection_specific_args"], 

293 candle_data=candle_data, 

294 current_prices=current_prices, 

295 currencies=currencies, 

296 currencies_before=currencies_before, 

297 all_orders=all_orders, 

298 date=date, 

299 dates=dates, 

300 values=values, 

301 actions=actions, 

302 prices=prices, 

303 total_amounts=total_amounts, 

304 taxes=taxes, 

305 amounts=amounts, 

306 tickers=tickers, 

307 extras=extras, 

308 ) 

309 tpi.append(time.time() - _time) 

310 _time = time.time() 

311 

312 if verbose: 

313 print(f"Simulation ended.\nAverage TPI: {sum(tpi) / len(tpi)}") 

314 

315 return Simulation.objects.create( 

316 space=self.space, 

317 bot=bot, 

318 start_date=self.start_date, 

319 end_date=self.end_date, 

320 simulation_reference=self.simulation_reference, 

321 data={ 

322 "dates": dates, 

323 "values": values, 

324 "actions": actions, 

325 "taxes": taxes, 

326 "amounts": amounts, 

327 "tickers": tickers, 

328 **prices, 

329 **total_amounts, 

330 **extras, 

331 }, 

332 ) 

333 

334 def irl_simulation(self, bot, no_db_data, verbose=True): 

335 data, _, exchange_controllers, min_interval = self.preparation(bot, no_db_data) 

336 _time = time.time() 

337 tpi = [] 

338 dates = [] 

339 values = [] 

340 actions = [] 

341 prices = {} 

342 total_amounts = {} 

343 taxes = [] 

344 amounts = [] 

345 tickers = [] 

346 extras = {csa.key: [] for csa in next(iter(no_db_data["connection_data"].values()))["connection_specific_args"].values()} 

347 currencies = bot.connections.all()[0].wallet.to_dict()["currencies"] 

348 for date, candle_data in data.items(): 

349 currencies_before = deepcopy(currencies) 

350 processed_data, current_prices = self.process_candle_data( 

351 candle_data=candle_data, 

352 min_interval=min_interval, 

353 ) 

354 

355 orders, batches = bot.get_orders(data=processed_data) 

356 

357 all_orders = [] 

358 for controller, batch in batches.items(): 

359 orders = controller.process_orders( 

360 no_db_data={ 

361 "buy_orders": [order for order in orders if order.side == SIDES.BUY], 

362 "sell_orders": [order for order in orders if order.side == SIDES.SELL], 

363 "keep_orders": [order for order in orders if order.side == SIDES.KEEP], 

364 "batches": [batch], 

365 "exchange_controller": exchange_controllers[controller], 

366 "min_trade": controller.min_notional / processed_data["candles"][controller]["latest"]["close"], 

367 "price": processed_data["candles"][controller]["latest"]["close"], 

368 }, 

369 testing=True, 

370 ) 

371 

372 controller.apply_orders(orders) 

373 for order in orders: 

374 order.apply_modifications() 

375 order.process_payout() 

376 all_orders += orders 

377 

378 currencies = bot.connections.all()[0].wallet.to_dict()["currencies"] 

379 self.append_data( 

380 connection_specific_args=bot.connections.all()[0].to_dict()["connection_specific_args"], 

381 candle_data=candle_data, 

382 current_prices=current_prices, 

383 currencies_before=currencies_before, 

384 currencies=currencies, 

385 all_orders=all_orders, 

386 date=date, 

387 dates=dates, 

388 values=values, 

389 actions=actions, 

390 prices=prices, 

391 total_amounts=total_amounts, 

392 taxes=taxes, 

393 amounts=amounts, 

394 tickers=tickers, 

395 extras=extras, 

396 ) 

397 tpi.append(time.time() - _time) 

398 _time = time.time() 

399 

400 if verbose: 

401 print(f"Simulation ended.\nAverage TPI: {sum(tpi) / len(tpi)}") 

402 

403 return Simulation.objects.create( 

404 space=self.space, 

405 bot=bot, 

406 start_date=self.start_date, 

407 end_date=self.end_date, 

408 simulation_reference=self.simulation_reference, 

409 data={ 

410 "dates": dates, 

411 "values": values, 

412 "actions": actions, 

413 "taxes": taxes, 

414 "amounts": amounts, 

415 "tickers": tickers, 

416 **prices, 

417 **total_amounts, 

418 **extras, 

419 }, 

420 ) 

421 

422 def run_quick_simulation(self, verbose=True): 

423 self.status = SIMULATION_STATUS.RUNNING 

424 self.save() 

425 bot, no_db_data = self.setup_simulation() 

426 

427 simulation = self.quick_simulation(bot=bot, no_db_data=no_db_data, verbose=verbose) 

428 

429 self.cleanup_simulation(bot) 

430 self.status = SIMULATION_STATUS.IDLE 

431 self.save() 

432 return simulation 

433 

434 def run_irl_simulation(self, verbose=True): 

435 self.status = SIMULATION_STATUS.RUNNING 

436 self.save() 

437 bot, no_db_data = self.setup_simulation() 

438 

439 simulation = self.irl_simulation(bot=bot, no_db_data=no_db_data, verbose=verbose) 

440 

441 self.cleanup_simulation(bot) 

442 self.status = SIMULATION_STATUS.IDLE 

443 self.save() 

444 return simulation 

445 

446 def is_finished(self): 

447 return self.status == SIMULATION_STATUS.IDLE and self.completion == 100 

448 

449 def get_status(self): 

450 """Return a dictionary with the status of the BotSimQueue. 

451 

452 Dict shape: 

453 ```json 

454 { 

455 "status": "RUNNING", 

456 "completion": 56.9, 

457 "eta": 00:01:04, 

458 "position_in_queue": 0, 

459 "error": false, 

460 } 

461 ``` 

462 """ 

463 return { 

464 "status": self.status, 

465 "completion": self.completion, 

466 "eta": self.eta, 

467 "position_in_queue": SimulationQueue.objects.filter(created_at__lt=self.created_at, error=False).count(), 

468 "error": self.error, 

469 } 

470 

471 def cancel(self): 

472 """Stop the BotSim.""" 

473 self.canceled = True 

474 self.save()