diff options
| author | Thibaut Horel <thibaut.horel@gmail.com> | 2014-01-28 00:14:54 -0500 |
|---|---|---|
| committer | Thibaut Horel <thibaut.horel@gmail.com> | 2014-01-28 00:14:54 -0500 |
| commit | 9f32604638a14cce101f9a3b1dd08971a8142f58 (patch) | |
| tree | e863695593821dce6004678df8990f964888fe89 /dispatcher.py | |
| download | fast-seeding-9f32604638a14cce101f9a3b1dd08971a8142f58.tar.gz | |
Initial commit
Diffstat (limited to 'dispatcher.py')
| -rw-r--r-- | dispatcher.py | 247 |
1 files changed, 247 insertions, 0 deletions
diff --git a/dispatcher.py b/dispatcher.py new file mode 100644 index 0000000..56fb9f7 --- /dev/null +++ b/dispatcher.py @@ -0,0 +1,247 @@ +from glob import glob +import os.path as pa +from time import sleep, time +import requests +from requests.exceptions import ConnectionError +from urllib import urlretrieve +from uuid import uuid1 +import tarfile +import itertools +from fnmatch import fnmatch +import os + + +def get_values(filename): + with open(filename) as f: + for line in f: + stripped = line.strip() + if stripped: + yield line.strip().split() + + +class Dispatcher: + + def __init__(self, filename): + self.followers_queue = {} + self.lookups_queue = set() + self.short_lookup_queue = set() + self.current_followers = set() + self.current_lookups = set() + self.users = {} + self.last_time = 0 + self.servers = {} + self.files = [open(fname) for fname in glob("data/*.txt")] + self.filename = filename + self.update_servers(first=True) + + for fh in self.files: + for line in fh: + values = line.strip().split() + self.add_user(*values[:3]) + + for fname in glob("data/users/lookup*.txt"): + self.aggregate_lookups(fname) + + for fname in glob("data/users/[0-9]*.txt"): + self.aggregate_followers(fname) + + def __del__(self): + for fh in self.files: + fh.close() + + def add_user(self, user_id, user_name, followers_count): + self.users[user_id] = user_name + if int(followers_count) >= 5000: + return + if (not pa.isfile(pa.join("data", "users", user_id + ".txt")) + and user_id not in self.current_followers): + self.followers_queue[user_id] = (user_name, followers_count) + + def aggregate_followers(self, filename): + basename = pa.splitext(pa.basename(filename))[0] + try: + self.current_followers.remove(basename) + except KeyError: + pass + for values in get_values(filename): + if (values[0] not in self.users + and values[0] not in self.current_lookups): + if len(values) == 2: + self.short_lookup_queue.add(tuple(values[:2])) + else: + self.lookups_queue.add(values[0]) + + def aggregate_lookups(self, filename): + for values in get_values(filename): + self.users[values[0]] = values[1] + try: + self.current_lookups.remove(values[0]) + except KeyError: + pass + + def update_servers(self, first=False): + ctime = int(time()) + if self.last_time >= ctime - 15: + return + else: + self.last_time = ctime + + with open(self.filename) as f: + for line in f: + server = line.strip() + self.servers[server] = {} + try: + self.servers[server] = requests.get(server + + "/status").json() + except ConnectionError: + self.servers[server]["dead"] = True + else: + self.servers[server]["dead"] = False + if int(self.servers[server]["done"]) > 0: + self.fetch_archive(server) + if first: + requests.get(server + "/restart") + + def fetch_archive(self, server): + filename = pa.join("data", "users", "archive-" + + str(uuid1()) + ".tar.gz") + urlretrieve(server + "/fetch", filename) + + with tarfile.open(filename, "r:gz") as tar: + tar.extractall() + + def get_tar_filenames(): + with tarfile.open(filename, "r:gz") as tar: + for tarinfo in tar: + if tarinfo.isfile(): + yield tarinfo.name + + for fname in get_tar_filenames(): + if fnmatch(fname, "*data/users/lookup*.txt"): + self.aggregate_lookups(fname) + + for fname in get_tar_filenames(): + if fnmatch(fname, "*data/users/[0-9]*.txt"): + self.aggregate_followers(fname) + + os.remove(filename) + + def clear_followers(self): + shorts = {key: int(value["short"]) + for key, value in self.servers.iteritems() + if not value["dead"]} + longs = {key: int(value["long"]) + for key, value in self.servers.iteritems() + if not value["dead"]} + while len(self.followers_queue) > 0: + if not shorts: + return + key, value = self.followers_queue.popitem() + if (pa.isfile(pa.join("data", "users", key + ".txt")) + or key in self.current_followers): + continue + self.current_followers.add(key) + if int(value[1]) > 1000: + server = min(longs, key=lambda key: longs[key]) + try: + requests.get(server + "/long", params={"id": key}) + except ConnectionError: + del shorts[server] + del longs[server] + self.followers_queue[key] = value + self.current_followers.remove(key) + else: + longs[server] += 1 + else: + server = min(shorts, key=lambda key: shorts[key]) + try: + requests.get(server + "/short", + params={"id": key, "user_name": value[0]}) + except ConnectionError: + del shorts[server] + del longs[server] + self.followers_queue[key] = value + self.current_followers.remove(key) + else: + shorts[server] += 1 + + def clear_lookups(self): + lookups = {key: int(value["lookup"]) + for key, value in self.servers.iteritems() + if not value["dead"]} + + while len(self.lookups_queue) > 100: + if not lookups: + return + l = [self.lookups_queue.pop() for _ in xrange(100)] + l = [e for e in l + if e not in self.current_lookups and e not in self.users] + for e in l: + self.current_lookups.add(e) + lstr = ",".join(l) + if lstr: + server = min(lookups, key=lambda key: lookups[key]) + try: + requests.post(server + "/lookup", data={"list": lstr}) + except ConnectionError: + del lookups[server] + for e in l: + self.lookups_queue.add(e) + self.current_lookups.remove(e) + else: + lookups[server] += 1 + + def clear_short_lookups(self): + lookups = {key: int(value["lookup"]) + for key, value in self.servers.iteritems() + if not value["dead"]} + + while len(self.short_lookup_queue) > 100: + if not lookups: + return + l = [self.short_lookup_queue.pop() for _ in xrange(100)] + l = [e for e in l + if e[0] not in self.current_lookups + and e[0] not in self.users] + for e in l: + self.current_lookups.add(e[0]) + lstr = ",".join(itertools.chain.from_iterable(l)) + if lstr: + server = min(lookups, key=lambda key: lookups[key]) + try: + requests.post(server + "/short_lookup", + data={"list": lstr}) + except ConnectionError: + del lookups[server] + for e in l: + self.short_lookup_queue.add(e) + self.current_lookups.remove(e[0]) + else: + lookups[server] += 1 + + def run(self): + while True: + self.update_servers() + for fh in self.files: + line = fh.readline() + if line: + values = line.strip().split() + self.add_user(*values[:3]) + break + else: + sleep(0.5) + self.clear_followers() + self.clear_lookups() + self.clear_short_lookups() + print len(self.current_followers), len(self.current_lookups),\ + len(self.followers_queue), len(self.lookups_queue) + + +if __name__ == "__main__": + import sys + try: + dispatcher = Dispatcher(sys.argv[1]) + except IndexError: + print "{0} <server_file>".format(sys.argv[0]) + else: + dispatcher.run() |
