diff options
| author | Thibaut Horel <thibaut.horel@gmail.com> | 2014-02-02 16:53:22 -0500 |
|---|---|---|
| committer | Thibaut Horel <thibaut.horel@gmail.com> | 2014-02-02 16:53:22 -0500 |
| commit | 7426d8ff0e7969eb1a86bdb5bec8a0c971309e2b (patch) | |
| tree | 323d6a9a4423b51fbebb37c115fddeab1c7a9641 /dispatcher.py | |
| parent | a0e95b0843d4e366e4b979685f7c821954afebc6 (diff) | |
| download | fast-seeding-7426d8ff0e7969eb1a86bdb5bec8a0c971309e2b.tar.gz | |
Facebook scraping
Diffstat (limited to 'dispatcher.py')
| -rw-r--r-- | dispatcher.py | 247 |
1 files changed, 0 insertions, 247 deletions
diff --git a/dispatcher.py b/dispatcher.py deleted file mode 100644 index 56fb9f7..0000000 --- a/dispatcher.py +++ /dev/null @@ -1,247 +0,0 @@ -from glob import glob -import os.path as pa -from time import sleep, time -import requests -from requests.exceptions import ConnectionError -from urllib import urlretrieve -from uuid import uuid1 -import tarfile -import itertools -from fnmatch import fnmatch -import os - - -def get_values(filename): - with open(filename) as f: - for line in f: - stripped = line.strip() - if stripped: - yield line.strip().split() - - -class Dispatcher: - - def __init__(self, filename): - self.followers_queue = {} - self.lookups_queue = set() - self.short_lookup_queue = set() - self.current_followers = set() - self.current_lookups = set() - self.users = {} - self.last_time = 0 - self.servers = {} - self.files = [open(fname) for fname in glob("data/*.txt")] - self.filename = filename - self.update_servers(first=True) - - for fh in self.files: - for line in fh: - values = line.strip().split() - self.add_user(*values[:3]) - - for fname in glob("data/users/lookup*.txt"): - self.aggregate_lookups(fname) - - for fname in glob("data/users/[0-9]*.txt"): - self.aggregate_followers(fname) - - def __del__(self): - for fh in self.files: - fh.close() - - def add_user(self, user_id, user_name, followers_count): - self.users[user_id] = user_name - if int(followers_count) >= 5000: - return - if (not pa.isfile(pa.join("data", "users", user_id + ".txt")) - and user_id not in self.current_followers): - self.followers_queue[user_id] = (user_name, followers_count) - - def aggregate_followers(self, filename): - basename = pa.splitext(pa.basename(filename))[0] - try: - self.current_followers.remove(basename) - except KeyError: - pass - for values in get_values(filename): - if (values[0] not in self.users - and values[0] not in self.current_lookups): - if len(values) == 2: - self.short_lookup_queue.add(tuple(values[:2])) - else: - self.lookups_queue.add(values[0]) - - def aggregate_lookups(self, filename): - for values in get_values(filename): - self.users[values[0]] = values[1] - try: - self.current_lookups.remove(values[0]) - except KeyError: - pass - - def update_servers(self, first=False): - ctime = int(time()) - if self.last_time >= ctime - 15: - return - else: - self.last_time = ctime - - with open(self.filename) as f: - for line in f: - server = line.strip() - self.servers[server] = {} - try: - self.servers[server] = requests.get(server - + "/status").json() - except ConnectionError: - self.servers[server]["dead"] = True - else: - self.servers[server]["dead"] = False - if int(self.servers[server]["done"]) > 0: - self.fetch_archive(server) - if first: - requests.get(server + "/restart") - - def fetch_archive(self, server): - filename = pa.join("data", "users", "archive-" - + str(uuid1()) + ".tar.gz") - urlretrieve(server + "/fetch", filename) - - with tarfile.open(filename, "r:gz") as tar: - tar.extractall() - - def get_tar_filenames(): - with tarfile.open(filename, "r:gz") as tar: - for tarinfo in tar: - if tarinfo.isfile(): - yield tarinfo.name - - for fname in get_tar_filenames(): - if fnmatch(fname, "*data/users/lookup*.txt"): - self.aggregate_lookups(fname) - - for fname in get_tar_filenames(): - if fnmatch(fname, "*data/users/[0-9]*.txt"): - self.aggregate_followers(fname) - - os.remove(filename) - - def clear_followers(self): - shorts = {key: int(value["short"]) - for key, value in self.servers.iteritems() - if not value["dead"]} - longs = {key: int(value["long"]) - for key, value in self.servers.iteritems() - if not value["dead"]} - while len(self.followers_queue) > 0: - if not shorts: - return - key, value = self.followers_queue.popitem() - if (pa.isfile(pa.join("data", "users", key + ".txt")) - or key in self.current_followers): - continue - self.current_followers.add(key) - if int(value[1]) > 1000: - server = min(longs, key=lambda key: longs[key]) - try: - requests.get(server + "/long", params={"id": key}) - except ConnectionError: - del shorts[server] - del longs[server] - self.followers_queue[key] = value - self.current_followers.remove(key) - else: - longs[server] += 1 - else: - server = min(shorts, key=lambda key: shorts[key]) - try: - requests.get(server + "/short", - params={"id": key, "user_name": value[0]}) - except ConnectionError: - del shorts[server] - del longs[server] - self.followers_queue[key] = value - self.current_followers.remove(key) - else: - shorts[server] += 1 - - def clear_lookups(self): - lookups = {key: int(value["lookup"]) - for key, value in self.servers.iteritems() - if not value["dead"]} - - while len(self.lookups_queue) > 100: - if not lookups: - return - l = [self.lookups_queue.pop() for _ in xrange(100)] - l = [e for e in l - if e not in self.current_lookups and e not in self.users] - for e in l: - self.current_lookups.add(e) - lstr = ",".join(l) - if lstr: - server = min(lookups, key=lambda key: lookups[key]) - try: - requests.post(server + "/lookup", data={"list": lstr}) - except ConnectionError: - del lookups[server] - for e in l: - self.lookups_queue.add(e) - self.current_lookups.remove(e) - else: - lookups[server] += 1 - - def clear_short_lookups(self): - lookups = {key: int(value["lookup"]) - for key, value in self.servers.iteritems() - if not value["dead"]} - - while len(self.short_lookup_queue) > 100: - if not lookups: - return - l = [self.short_lookup_queue.pop() for _ in xrange(100)] - l = [e for e in l - if e[0] not in self.current_lookups - and e[0] not in self.users] - for e in l: - self.current_lookups.add(e[0]) - lstr = ",".join(itertools.chain.from_iterable(l)) - if lstr: - server = min(lookups, key=lambda key: lookups[key]) - try: - requests.post(server + "/short_lookup", - data={"list": lstr}) - except ConnectionError: - del lookups[server] - for e in l: - self.short_lookup_queue.add(e) - self.current_lookups.remove(e[0]) - else: - lookups[server] += 1 - - def run(self): - while True: - self.update_servers() - for fh in self.files: - line = fh.readline() - if line: - values = line.strip().split() - self.add_user(*values[:3]) - break - else: - sleep(0.5) - self.clear_followers() - self.clear_lookups() - self.clear_short_lookups() - print len(self.current_followers), len(self.current_lookups),\ - len(self.followers_queue), len(self.lookups_queue) - - -if __name__ == "__main__": - import sys - try: - dispatcher = Dispatcher(sys.argv[1]) - except IndexError: - print "{0} <server_file>".format(sys.argv[0]) - else: - dispatcher.run() |
