summaryrefslogtreecommitdiffstats
path: root/dispatcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'dispatcher.py')
-rw-r--r--dispatcher.py247
1 files changed, 247 insertions, 0 deletions
diff --git a/dispatcher.py b/dispatcher.py
new file mode 100644
index 0000000..56fb9f7
--- /dev/null
+++ b/dispatcher.py
@@ -0,0 +1,247 @@
+from glob import glob
+import os.path as pa
+from time import sleep, time
+import requests
+from requests.exceptions import ConnectionError
+from urllib import urlretrieve
+from uuid import uuid1
+import tarfile
+import itertools
+from fnmatch import fnmatch
+import os
+
+
+def get_values(filename):
+ with open(filename) as f:
+ for line in f:
+ stripped = line.strip()
+ if stripped:
+ yield line.strip().split()
+
+
+class Dispatcher:
+
+ def __init__(self, filename):
+ self.followers_queue = {}
+ self.lookups_queue = set()
+ self.short_lookup_queue = set()
+ self.current_followers = set()
+ self.current_lookups = set()
+ self.users = {}
+ self.last_time = 0
+ self.servers = {}
+ self.files = [open(fname) for fname in glob("data/*.txt")]
+ self.filename = filename
+ self.update_servers(first=True)
+
+ for fh in self.files:
+ for line in fh:
+ values = line.strip().split()
+ self.add_user(*values[:3])
+
+ for fname in glob("data/users/lookup*.txt"):
+ self.aggregate_lookups(fname)
+
+ for fname in glob("data/users/[0-9]*.txt"):
+ self.aggregate_followers(fname)
+
+ def __del__(self):
+ for fh in self.files:
+ fh.close()
+
+ def add_user(self, user_id, user_name, followers_count):
+ self.users[user_id] = user_name
+ if int(followers_count) >= 5000:
+ return
+ if (not pa.isfile(pa.join("data", "users", user_id + ".txt"))
+ and user_id not in self.current_followers):
+ self.followers_queue[user_id] = (user_name, followers_count)
+
+ def aggregate_followers(self, filename):
+ basename = pa.splitext(pa.basename(filename))[0]
+ try:
+ self.current_followers.remove(basename)
+ except KeyError:
+ pass
+ for values in get_values(filename):
+ if (values[0] not in self.users
+ and values[0] not in self.current_lookups):
+ if len(values) == 2:
+ self.short_lookup_queue.add(tuple(values[:2]))
+ else:
+ self.lookups_queue.add(values[0])
+
+ def aggregate_lookups(self, filename):
+ for values in get_values(filename):
+ self.users[values[0]] = values[1]
+ try:
+ self.current_lookups.remove(values[0])
+ except KeyError:
+ pass
+
+ def update_servers(self, first=False):
+ ctime = int(time())
+ if self.last_time >= ctime - 15:
+ return
+ else:
+ self.last_time = ctime
+
+ with open(self.filename) as f:
+ for line in f:
+ server = line.strip()
+ self.servers[server] = {}
+ try:
+ self.servers[server] = requests.get(server
+ + "/status").json()
+ except ConnectionError:
+ self.servers[server]["dead"] = True
+ else:
+ self.servers[server]["dead"] = False
+ if int(self.servers[server]["done"]) > 0:
+ self.fetch_archive(server)
+ if first:
+ requests.get(server + "/restart")
+
+ def fetch_archive(self, server):
+ filename = pa.join("data", "users", "archive-"
+ + str(uuid1()) + ".tar.gz")
+ urlretrieve(server + "/fetch", filename)
+
+ with tarfile.open(filename, "r:gz") as tar:
+ tar.extractall()
+
+ def get_tar_filenames():
+ with tarfile.open(filename, "r:gz") as tar:
+ for tarinfo in tar:
+ if tarinfo.isfile():
+ yield tarinfo.name
+
+ for fname in get_tar_filenames():
+ if fnmatch(fname, "*data/users/lookup*.txt"):
+ self.aggregate_lookups(fname)
+
+ for fname in get_tar_filenames():
+ if fnmatch(fname, "*data/users/[0-9]*.txt"):
+ self.aggregate_followers(fname)
+
+ os.remove(filename)
+
+ def clear_followers(self):
+ shorts = {key: int(value["short"])
+ for key, value in self.servers.iteritems()
+ if not value["dead"]}
+ longs = {key: int(value["long"])
+ for key, value in self.servers.iteritems()
+ if not value["dead"]}
+ while len(self.followers_queue) > 0:
+ if not shorts:
+ return
+ key, value = self.followers_queue.popitem()
+ if (pa.isfile(pa.join("data", "users", key + ".txt"))
+ or key in self.current_followers):
+ continue
+ self.current_followers.add(key)
+ if int(value[1]) > 1000:
+ server = min(longs, key=lambda key: longs[key])
+ try:
+ requests.get(server + "/long", params={"id": key})
+ except ConnectionError:
+ del shorts[server]
+ del longs[server]
+ self.followers_queue[key] = value
+ self.current_followers.remove(key)
+ else:
+ longs[server] += 1
+ else:
+ server = min(shorts, key=lambda key: shorts[key])
+ try:
+ requests.get(server + "/short",
+ params={"id": key, "user_name": value[0]})
+ except ConnectionError:
+ del shorts[server]
+ del longs[server]
+ self.followers_queue[key] = value
+ self.current_followers.remove(key)
+ else:
+ shorts[server] += 1
+
+ def clear_lookups(self):
+ lookups = {key: int(value["lookup"])
+ for key, value in self.servers.iteritems()
+ if not value["dead"]}
+
+ while len(self.lookups_queue) > 100:
+ if not lookups:
+ return
+ l = [self.lookups_queue.pop() for _ in xrange(100)]
+ l = [e for e in l
+ if e not in self.current_lookups and e not in self.users]
+ for e in l:
+ self.current_lookups.add(e)
+ lstr = ",".join(l)
+ if lstr:
+ server = min(lookups, key=lambda key: lookups[key])
+ try:
+ requests.post(server + "/lookup", data={"list": lstr})
+ except ConnectionError:
+ del lookups[server]
+ for e in l:
+ self.lookups_queue.add(e)
+ self.current_lookups.remove(e)
+ else:
+ lookups[server] += 1
+
+ def clear_short_lookups(self):
+ lookups = {key: int(value["lookup"])
+ for key, value in self.servers.iteritems()
+ if not value["dead"]}
+
+ while len(self.short_lookup_queue) > 100:
+ if not lookups:
+ return
+ l = [self.short_lookup_queue.pop() for _ in xrange(100)]
+ l = [e for e in l
+ if e[0] not in self.current_lookups
+ and e[0] not in self.users]
+ for e in l:
+ self.current_lookups.add(e[0])
+ lstr = ",".join(itertools.chain.from_iterable(l))
+ if lstr:
+ server = min(lookups, key=lambda key: lookups[key])
+ try:
+ requests.post(server + "/short_lookup",
+ data={"list": lstr})
+ except ConnectionError:
+ del lookups[server]
+ for e in l:
+ self.short_lookup_queue.add(e)
+ self.current_lookups.remove(e[0])
+ else:
+ lookups[server] += 1
+
+ def run(self):
+ while True:
+ self.update_servers()
+ for fh in self.files:
+ line = fh.readline()
+ if line:
+ values = line.strip().split()
+ self.add_user(*values[:3])
+ break
+ else:
+ sleep(0.5)
+ self.clear_followers()
+ self.clear_lookups()
+ self.clear_short_lookups()
+ print len(self.current_followers), len(self.current_lookups),\
+ len(self.followers_queue), len(self.lookups_queue)
+
+
+if __name__ == "__main__":
+ import sys
+ try:
+ dispatcher = Dispatcher(sys.argv[1])
+ except IndexError:
+ print "{0} <server_file>".format(sys.argv[0])
+ else:
+ dispatcher.run()