blob: 58f7984c4381b123556ace6c627c953093ae6aca (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
from serenitas.utils import get_redis_queue
from trade_dataclasses import DealKind
from pickle import loads
from .funds import Selene
def upload_products():
selene = Selene()
cache = set()
while True:
pickle = q.lpop("product_queue")
trade_type, trade = loads(pickle)
if (h := hash(trade)) in cache: # we've gone full circle
break
else:
obj = DealKind[trade_type].from_dict(**trade)
product = obj.product
if product.status == "Pending":
q.rpush("product_queue", pickle)
selene.staging_queue.append(product.to_citco())
_cache |= h
buf, dest = selene.build_buffer()
selene.upload(buf, dest.name)
async def gather_product():
loop = asynctio.get_running_loop()
end_time = loop.time() + 300
while True:
if loop.time() >= end_time:
do_something()
end_time = loop.time() + 300
await asyncio.sleep(1)
|