from serenitas.utils import get_redis_queue from trade_dataclasses import DealKind from pickle import loads from .funds import Selene def upload_products(): selene = Selene() cache = set() while True: pickle = q.lpop("product_queue") trade_type, trade = loads(pickle) if (h := hash(trade)) in cache: # we've gone full circle break else: obj = DealKind[trade_type].from_dict(**trade) product = obj.product if product.status == "Pending": q.rpush("product_queue", pickle) selene.staging_queue.append(product.to_citco()) _cache |= h buf, dest = selene.build_buffer() selene.upload(buf, dest.name) async def gather_product(): loop = asynctio.get_running_loop() end_time = loop.time() + 300 while True: if loop.time() >= end_time: do_something() end_time = loop.time() + 300 await asyncio.sleep(1)