aboutsummaryrefslogtreecommitdiffstats
path: root/python/ops/product_loop.py
blob: 58f7984c4381b123556ace6c627c953093ae6aca (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from serenitas.utils import get_redis_queue
from trade_dataclasses import DealKind
from pickle import loads
from .funds import Selene


def upload_products():
    selene = Selene()
    cache = set()
    while True:
        pickle = q.lpop("product_queue")
        trade_type, trade = loads(pickle)
        if (h := hash(trade)) in cache:  # we've gone full circle
            break
        else:
            obj = DealKind[trade_type].from_dict(**trade)
            product = obj.product
            if product.status == "Pending":
                q.rpush("product_queue", pickle)
            selene.staging_queue.append(product.to_citco())
            _cache |= h
    buf, dest = selene.build_buffer()
    selene.upload(buf, dest.name)


async def gather_product():
    loop = asynctio.get_running_loop()
    end_time = loop.time() + 300
    while True:
        if loop.time() >= end_time:
            do_something()
            end_time = loop.time() + 300
        await asyncio.sleep(1)