from glob import glob import os.path as pa from time import sleep, time import requests from requests.exceptions import ConnectionError from urllib import urlretrieve from uuid import uuid1 import tarfile import itertools from fnmatch import fnmatch import os def get_values(filename): with open(filename) as f: for line in f: stripped = line.strip() if stripped: yield line.strip().split() class Dispatcher: def __init__(self, filename): self.followers_queue = {} self.lookups_queue = set() self.short_lookup_queue = set() self.current_followers = set() self.current_lookups = set() self.users = {} self.last_time = 0 self.servers = {} self.files = [open(fname) for fname in glob("data/*.txt")] self.filename = filename self.update_servers(first=True) for fh in self.files: for line in fh: values = line.strip().split() self.add_user(*values[:3]) for fname in glob("data/users/lookup*.txt"): self.aggregate_lookups(fname) for fname in glob("data/users/[0-9]*.txt"): self.aggregate_followers(fname) def __del__(self): for fh in self.files: fh.close() def add_user(self, user_id, user_name, followers_count): self.users[user_id] = user_name if int(followers_count) >= 5000: return if (not pa.isfile(pa.join("data", "users", user_id + ".txt")) and user_id not in self.current_followers): self.followers_queue[user_id] = (user_name, followers_count) def aggregate_followers(self, filename): basename = pa.splitext(pa.basename(filename))[0] try: self.current_followers.remove(basename) except KeyError: pass for values in get_values(filename): if (values[0] not in self.users and values[0] not in self.current_lookups): if len(values) == 2: self.short_lookup_queue.add(tuple(values[:2])) else: self.lookups_queue.add(values[0]) def aggregate_lookups(self, filename): for values in get_values(filename): self.users[values[0]] = values[1] try: self.current_lookups.remove(values[0]) except KeyError: pass def update_servers(self, first=False): ctime = int(time()) if self.last_time >= ctime - 15: return else: self.last_time = ctime with open(self.filename) as f: for line in f: server = line.strip() self.servers[server] = {} try: self.servers[server] = requests.get(server + "/status").json() except ConnectionError: self.servers[server]["dead"] = True else: self.servers[server]["dead"] = False if int(self.servers[server]["done"]) > 0: self.fetch_archive(server) if first: requests.get(server + "/restart") def fetch_archive(self, server): filename = pa.join("data", "users", "archive-" + str(uuid1()) + ".tar.gz") urlretrieve(server + "/fetch", filename) with tarfile.open(filename, "r:gz") as tar: tar.extractall() def get_tar_filenames(): with tarfile.open(filename, "r:gz") as tar: for tarinfo in tar: if tarinfo.isfile(): yield tarinfo.name for fname in get_tar_filenames(): if fnmatch(fname, "*data/users/lookup*.txt"): self.aggregate_lookups(fname) for fname in get_tar_filenames(): if fnmatch(fname, "*data/users/[0-9]*.txt"): self.aggregate_followers(fname) os.remove(filename) def clear_followers(self): shorts = {key: int(value["short"]) for key, value in self.servers.iteritems() if not value["dead"]} longs = {key: int(value["long"]) for key, value in self.servers.iteritems() if not value["dead"]} while len(self.followers_queue) > 0: if not shorts: return key, value = self.followers_queue.popitem() if (pa.isfile(pa.join("data", "users", key + ".txt")) or key in self.current_followers): continue self.current_followers.add(key) if int(value[1]) > 1000: server = min(longs, key=lambda key: longs[key]) try: requests.get(server + "/long", params={"id": key}) except ConnectionError: del shorts[server] del longs[server] self.followers_queue[key] = value self.current_followers.remove(key) else: longs[server] += 1 else: server = min(shorts, key=lambda key: shorts[key]) try: requests.get(server + "/short", params={"id": key, "user_name": value[0]}) except ConnectionError: del shorts[server] del longs[server] self.followers_queue[key] = value self.current_followers.remove(key) else: shorts[server] += 1 def clear_lookups(self): lookups = {key: int(value["lookup"]) for key, value in self.servers.iteritems() if not value["dead"]} while len(self.lookups_queue) > 100: if not lookups: return l = [self.lookups_queue.pop() for _ in xrange(100)] l = [e for e in l if e not in self.current_lookups and e not in self.users] for e in l: self.current_lookups.add(e) lstr = ",".join(l) if lstr: server = min(lookups, key=lambda key: lookups[key]) try: requests.post(server + "/lookup", data={"list": lstr}) except ConnectionError: del lookups[server] for e in l: self.lookups_queue.add(e) self.current_lookups.remove(e) else: lookups[server] += 1 def clear_short_lookups(self): lookups = {key: int(value["lookup"]) for key, value in self.servers.iteritems() if not value["dead"]} while len(self.short_lookup_queue) > 100: if not lookups: return l = [self.short_lookup_queue.pop() for _ in xrange(100)] l = [e for e in l if e[0] not in self.current_lookups and e[0] not in self.users] for e in l: self.current_lookups.add(e[0]) lstr = ",".join(itertools.chain.from_iterable(l)) if lstr: server = min(lookups, key=lambda key: lookups[key]) try: requests.post(server + "/short_lookup", data={"list": lstr}) except ConnectionError: del lookups[server] for e in l: self.short_lookup_queue.add(e) self.current_lookups.remove(e[0]) else: lookups[server] += 1 def run(self): while True: self.update_servers() for fh in self.files: line = fh.readline() if line: values = line.strip().split() self.add_user(*values[:3]) break else: sleep(0.5) self.clear_followers() self.clear_lookups() self.clear_short_lookups() print len(self.current_followers), len(self.current_lookups),\ len(self.followers_queue), len(self.lookups_queue) if __name__ == "__main__": import sys try: dispatcher = Dispatcher(sys.argv[1]) except IndexError: print "{0} ".format(sys.argv[0]) else: dispatcher.run()