Coverage for django_napse/simulations/models/simulations/simulation_queue.py: 96%
249 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-12 13:49 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-12 13:49 +0000
1import time
2import uuid
3from contextlib import suppress
4from copy import deepcopy
5from datetime import datetime, timedelta
7from django.db import models
9from django_napse.core.models.bots.bot import Bot
10from django_napse.core.models.bots.controller import Controller
11from django_napse.core.models.modifications import ArchitectureModification, ConnectionModification, StrategyModification
12from django_napse.core.models.orders.order import Order, OrderBatch
13from django_napse.core.models.transactions.credit import Credit
14from django_napse.simulations.models.datasets.dataset import Candle, DataSet
15from django_napse.simulations.models.simulations.managers import SimulationQueueManager
16from django_napse.utils.constants import EXCHANGE_INTERVALS, ORDER_LEEWAY_PERCENTAGE, ORDER_STATUS, SIDES, SIMULATION_STATUS
17from django_napse.utils.errors import ControllerError
19from .simulation import Simulation
22class SimulationQueue(models.Model):
23 """Queue wrapper for a simulation."""
25 simulation_reference = models.UUIDField(unique=True, editable=False, default=uuid.uuid4)
26 space = models.ForeignKey("django_napse_core.NapseSpace", on_delete=models.CASCADE, null=True)
27 bot = models.OneToOneField("django_napse_core.Bot", on_delete=models.CASCADE, null=True)
29 start_date = models.DateTimeField()
30 end_date = models.DateTimeField()
31 canceled = models.BooleanField(default=False)
33 status = models.CharField(max_length=12, default=SIMULATION_STATUS.IDLE)
34 completion = models.FloatField(default=0.0)
35 eta = models.DurationField(blank=True, null=True)
37 error = models.BooleanField(default=False)
39 created_at = models.DateTimeField(auto_now_add=True)
41 objects = SimulationQueueManager()
43 def __str__(self) -> str:
44 return f"BOT SIM QUEUE {self.pk}"
46 def info(self, verbose: bool = True, beacon: str = "") -> str: # noqa: FBT002, FBT001
47 """Return info on SimulationQueue."""
48 string = ""
49 string += f"{beacon}SimulationQueue {self.pk}:\n"
50 string += f"{beacon}\t{self.bot=}\n"
51 string += f"{beacon}\t{self.space=}\n"
53 string += f"{beacon}\t{self.start_date=}\n"
54 string += f"{beacon}\t{self.end_date=}\n"
55 string += f"{beacon}\t{self.created_at=}\n"
57 string += f"{beacon}Investments:\n"
58 investments = self.investments.all()
59 if investments.count() > 0:
60 for investment in investments:
61 string += f"{beacon}\t{investment}\n"
62 else:
63 string += f"{beacon}\tNo investments\n"
64 if verbose:
65 print(string)
66 return string
68 def setup_simulation(self) -> tuple[Bot, dict[str, any]]:
69 """Setup investment and connections for the simulation."""
70 self.space.simulation_wallet.find().reset()
71 for investment in self.investments.all():
72 Credit.objects.create(
73 wallet=self.space.simulation_wallet,
74 ticker=investment.ticker,
75 amount=investment.amount,
76 )
77 new_bot = self.bot.copy()
78 connection = new_bot.connect_to_wallet(self.space.simulation_wallet)
79 for investment in self.investments.all():
80 connection.deposit(investment.ticker, investment.amount)
81 no_db_data = new_bot.architecture.prepare_db_data()
82 return new_bot, no_db_data
84 def cleanup_simulation(self, bot: Bot) -> None:
85 """Reset the simulation: reset wallet & deactivate the bot."""
86 self.space.simulation_wallet.reset()
87 bot.hibernate()
89 def preparation(self, bot, no_db_data):
90 intervals = {controller.interval for controller in bot.controllers.values()}
91 for interval in EXCHANGE_INTERVALS[next(iter(bot.controllers.values())).exchange_account.exchange.name]:
92 if interval in intervals:
93 min_interval = interval
94 break
96 currencies = next(iter(no_db_data["connection_data"].values()))["wallet"]["currencies"]
98 exchange_controllers = {controller: controller.exchange_controller for controller in bot.controllers.values()}
100 data = {}
101 datasets = []
102 for controller in bot.controllers.values():
103 datasets.append(DataSet.objects.create(controller=controller, start_date=self.start_date, end_date=self.end_date))
104 with suppress(ControllerError.InvalidSetting):
105 datasets.append(
106 DataSet.objects.create(
107 controller=Controller.get(
108 exchange_account=controller.exchange_account,
109 base=controller.base,
110 quote="USDT",
111 interval=min_interval,
112 ),
113 start_date=self.start_date,
114 end_date=self.end_date,
115 ),
116 )
117 with suppress(ControllerError.InvalidSetting):
118 datasets.append(
119 DataSet.objects.create(
120 controller=Controller.get(
121 exchange_account=controller.exchange_account,
122 base=controller.quote,
123 quote="USDT",
124 interval=min_interval,
125 ),
126 start_date=self.start_date,
127 end_date=self.end_date,
128 ),
129 )
130 datasets = set(datasets)
131 candles = Candle.objects.filter(dataset__in=datasets).order_by("open_time")
132 open_times = candles.values_list("open_time", flat=True).distinct()
133 for open_time in open_times:
134 data[open_time] = {}
135 for controller in bot.controllers.values():
136 data[open_time][controller] = None
137 for candle in candles:
138 candle_dict = candle.to_dict()
139 controller = candle_dict.pop("controller")
140 data[candle.open_time][controller] = candle_dict
142 for controller in bot.controllers.values():
143 last_candle = None
144 for open_time in open_times:
145 if data[open_time][controller] is None:
146 data[open_time][controller] = last_candle
147 else:
148 last_candle = data[open_time][controller]
149 return data, currencies, exchange_controllers, min_interval
151 def process_candle_data(self, candle_data, min_interval):
152 processed_data = {"candles": {}, "extras": {}}
153 current_prices = {}
154 for controller, candle in candle_data.items():
155 processed_data["candles"][controller] = {"current": candle, "latest": candle}
156 if controller.quote == "USDT" and controller.interval == min_interval:
157 price = candle["close"]
158 current_prices[f"{controller.base}_price"] = price
159 return processed_data, current_prices
161 def append_data(
162 self,
163 connection_specific_args: dict,
164 candle_data: dict,
165 current_prices: dict,
166 currencies: dict,
167 currencies_before: dict,
168 all_orders: list,
169 date: datetime,
170 dates: list,
171 values: list,
172 actions: list,
173 prices: dict,
174 total_amounts: dict,
175 amounts: list,
176 tickers: list,
177 taxes: list,
178 extras: dict,
179 ):
180 current_amounts = {}
181 for controller in candle_data:
182 current_amounts[f"{controller.base}_amount"] = currencies.get(controller.base, {"amount": 0})["amount"]
183 current_amounts[f"{controller.quote}_amount"] = currencies.get(controller.quote, {"amount": 0})["amount"]
185 wallet_value = 0
186 for ticker, currency in currencies.items():
187 amount = currency["amount"]
188 price = 1 if ticker == "USDT" else current_prices[f"{ticker}_price"]
189 wallet_value += amount * price
191 wallet_value_before = 0
192 for ticker, currency in currencies_before.items():
193 amount = currency["amount"]
194 price = 1 if ticker == "USDT" else current_prices[f"{ticker}_price"]
195 wallet_value_before += amount * price
197 for index, order in enumerate(all_orders):
198 dates.append(date + index * timedelta(seconds=1))
199 values.append(round(wallet_value, 5))
200 actions.append(order.side)
201 for ticker_price in current_prices:
202 prices[ticker_price] = [*prices.get(ticker_price, []), 1 if ticker_price == "USDT" else round(current_prices[ticker_price], 5)]
203 for ticker_amount in current_amounts:
204 total_amounts[ticker_amount] = [*total_amounts.get(ticker_amount, []), round(current_amounts[ticker_amount], 5)]
205 taxes.append(round(order.fees * (current_prices[f"{order.fee_ticker}_price"] if order.fee_ticker != "USDT" else 1), 5))
206 amounts.append(round(order.asked_for_amount, 5))
207 tickers.append(order.asked_for_ticker)
208 for plugin in extras:
209 extras[plugin] = [*extras[plugin], connection_specific_args[plugin].get_value()]
211 def quick_simulation(self, bot, no_db_data, verbose=True):
212 data, currencies, exchange_controllers, min_interval = self.preparation(bot, no_db_data)
213 _time = time.time()
214 tpi = []
215 dates = []
216 values = []
217 actions = []
218 prices = {}
219 total_amounts = {}
220 taxes = []
221 amounts = []
222 tickers = []
223 extras = {csa.key: [] for csa in next(iter(no_db_data["connection_data"].values()))["connection_specific_args"].values()}
224 for date, candle_data in data.items():
225 currencies_before = deepcopy(currencies)
226 processed_data, current_prices = self.process_candle_data(
227 candle_data=candle_data,
228 min_interval=min_interval,
229 )
230 orders = bot._get_orders(data=processed_data, no_db_data=no_db_data)
231 batches = {}
232 for order in orders:
233 debited_amount = order["asked_for_amount"] * (1 + ORDER_LEEWAY_PERCENTAGE / 100)
234 if debited_amount > 0:
235 currencies[order["asked_for_ticker"]]["amount"] -= debited_amount
236 order["debited_amount"] = debited_amount
238 controller = order["controller"]
239 batches[controller] = OrderBatch(controller=controller)
241 for batch in batches.values():
242 batch.status = ORDER_STATUS.READY
244 all_orders = []
245 for controller, batch in batches.items():
246 all_modifications = []
247 controller_orders = [order for order in orders if order["controller"] == controller]
248 order_objects = []
249 for order in controller_orders:
250 order.pop("controller")
251 strategy_modifications = order.pop("StrategyModifications")
252 connection_modifications = order.pop("ConnectionModifications")
253 architecture_modifications = order.pop("ArchitectureModifications")
254 order = Order(batch=batch, **order)
255 order_objects.append(order)
256 for modification in strategy_modifications:
257 all_modifications.append(StrategyModification(order=order, **modification))
258 for modification in connection_modifications:
259 all_modifications.append(ConnectionModification(order=order, **modification))
260 for modification in architecture_modifications:
261 all_modifications.append(ArchitectureModification(order=order, **modification))
263 orders = controller.process_orders(
264 no_db_data={
265 "buy_orders": [order for order in order_objects if order.side == SIDES.BUY],
266 "sell_orders": [order for order in order_objects if order.side == SIDES.SELL],
267 "keep_orders": [order for order in order_objects if order.side == SIDES.KEEP],
268 "batches": [batch],
269 "exchange_controller": exchange_controllers[controller],
270 "min_trade": controller.min_notional / processed_data["candles"][controller]["latest"]["close"],
271 "price": processed_data["candles"][controller]["latest"]["close"],
272 },
273 testing=True,
274 )
275 for order in orders:
276 order._apply_modifications(
277 batch=batch,
278 modifications=[modification for modification in all_modifications if modification.order == order],
279 strategy=no_db_data["strategy"],
280 architecture=no_db_data["architecture"],
281 currencies=currencies,
282 )
284 currencies[controller.base] = currencies.get(controller.base, {"amount": 0, "mbp": 0})
285 currencies[controller.quote] = currencies.get(controller.quote, {"amount": 0, "mbp": 0})
286 currencies[controller.base]["amount"] += order.exit_amount_base
287 currencies[controller.quote]["amount"] += order.exit_amount_quote
289 all_orders += orders
291 self.append_data(
292 connection_specific_args=next(iter(no_db_data["connection_data"].values()))["connection_specific_args"],
293 candle_data=candle_data,
294 current_prices=current_prices,
295 currencies=currencies,
296 currencies_before=currencies_before,
297 all_orders=all_orders,
298 date=date,
299 dates=dates,
300 values=values,
301 actions=actions,
302 prices=prices,
303 total_amounts=total_amounts,
304 taxes=taxes,
305 amounts=amounts,
306 tickers=tickers,
307 extras=extras,
308 )
309 tpi.append(time.time() - _time)
310 _time = time.time()
312 if verbose:
313 print(f"Simulation ended.\nAverage TPI: {sum(tpi) / len(tpi)}")
315 return Simulation.objects.create(
316 space=self.space,
317 bot=bot,
318 start_date=self.start_date,
319 end_date=self.end_date,
320 simulation_reference=self.simulation_reference,
321 data={
322 "dates": dates,
323 "values": values,
324 "actions": actions,
325 "taxes": taxes,
326 "amounts": amounts,
327 "tickers": tickers,
328 **prices,
329 **total_amounts,
330 **extras,
331 },
332 )
334 def irl_simulation(self, bot, no_db_data, verbose=True):
335 data, _, exchange_controllers, min_interval = self.preparation(bot, no_db_data)
336 _time = time.time()
337 tpi = []
338 dates = []
339 values = []
340 actions = []
341 prices = {}
342 total_amounts = {}
343 taxes = []
344 amounts = []
345 tickers = []
346 extras = {csa.key: [] for csa in next(iter(no_db_data["connection_data"].values()))["connection_specific_args"].values()}
347 currencies = bot.connections.all()[0].wallet.to_dict()["currencies"]
348 for date, candle_data in data.items():
349 currencies_before = deepcopy(currencies)
350 processed_data, current_prices = self.process_candle_data(
351 candle_data=candle_data,
352 min_interval=min_interval,
353 )
355 orders, batches = bot.get_orders(data=processed_data)
357 all_orders = []
358 for controller, batch in batches.items():
359 orders = controller.process_orders(
360 no_db_data={
361 "buy_orders": [order for order in orders if order.side == SIDES.BUY],
362 "sell_orders": [order for order in orders if order.side == SIDES.SELL],
363 "keep_orders": [order for order in orders if order.side == SIDES.KEEP],
364 "batches": [batch],
365 "exchange_controller": exchange_controllers[controller],
366 "min_trade": controller.min_notional / processed_data["candles"][controller]["latest"]["close"],
367 "price": processed_data["candles"][controller]["latest"]["close"],
368 },
369 testing=True,
370 )
372 controller.apply_orders(orders)
373 for order in orders:
374 order.apply_modifications()
375 order.process_payout()
376 all_orders += orders
378 currencies = bot.connections.all()[0].wallet.to_dict()["currencies"]
379 self.append_data(
380 connection_specific_args=bot.connections.all()[0].to_dict()["connection_specific_args"],
381 candle_data=candle_data,
382 current_prices=current_prices,
383 currencies_before=currencies_before,
384 currencies=currencies,
385 all_orders=all_orders,
386 date=date,
387 dates=dates,
388 values=values,
389 actions=actions,
390 prices=prices,
391 total_amounts=total_amounts,
392 taxes=taxes,
393 amounts=amounts,
394 tickers=tickers,
395 extras=extras,
396 )
397 tpi.append(time.time() - _time)
398 _time = time.time()
400 if verbose:
401 print(f"Simulation ended.\nAverage TPI: {sum(tpi) / len(tpi)}")
403 return Simulation.objects.create(
404 space=self.space,
405 bot=bot,
406 start_date=self.start_date,
407 end_date=self.end_date,
408 simulation_reference=self.simulation_reference,
409 data={
410 "dates": dates,
411 "values": values,
412 "actions": actions,
413 "taxes": taxes,
414 "amounts": amounts,
415 "tickers": tickers,
416 **prices,
417 **total_amounts,
418 **extras,
419 },
420 )
422 def run_quick_simulation(self, verbose=True):
423 self.status = SIMULATION_STATUS.RUNNING
424 self.save()
425 bot, no_db_data = self.setup_simulation()
427 simulation = self.quick_simulation(bot=bot, no_db_data=no_db_data, verbose=verbose)
429 self.cleanup_simulation(bot)
430 self.status = SIMULATION_STATUS.IDLE
431 self.save()
432 return simulation
434 def run_irl_simulation(self, verbose=True):
435 self.status = SIMULATION_STATUS.RUNNING
436 self.save()
437 bot, no_db_data = self.setup_simulation()
439 simulation = self.irl_simulation(bot=bot, no_db_data=no_db_data, verbose=verbose)
441 self.cleanup_simulation(bot)
442 self.status = SIMULATION_STATUS.IDLE
443 self.save()
444 return simulation
446 def is_finished(self):
447 return self.status == SIMULATION_STATUS.IDLE and self.completion == 100
449 def get_status(self):
450 """Return a dictionary with the status of the BotSimQueue.
452 Dict shape:
453 ```json
454 {
455 "status": "RUNNING",
456 "completion": 56.9,
457 "eta": 00:01:04,
458 "position_in_queue": 0,
459 "error": false,
460 }
461 ```
462 """
463 return {
464 "status": self.status,
465 "completion": self.completion,
466 "eta": self.eta,
467 "position_in_queue": SimulationQueue.objects.filter(created_at__lt=self.created_at, error=False).count(),
468 "error": self.error,
469 }
471 def cancel(self):
472 """Stop the BotSim."""
473 self.canceled = True
474 self.save()