summaryrefslogtreecommitdiffstats
path: root/dispatcher.py
diff options
context:
space:
mode:
authorThibaut Horel <thibaut.horel@gmail.com>2014-02-02 16:53:22 -0500
committerThibaut Horel <thibaut.horel@gmail.com>2014-02-02 16:53:22 -0500
commit7426d8ff0e7969eb1a86bdb5bec8a0c971309e2b (patch)
tree323d6a9a4423b51fbebb37c115fddeab1c7a9641 /dispatcher.py
parenta0e95b0843d4e366e4b979685f7c821954afebc6 (diff)
downloadfast-seeding-7426d8ff0e7969eb1a86bdb5bec8a0c971309e2b.tar.gz
Facebook scraping
Diffstat (limited to 'dispatcher.py')
-rw-r--r--dispatcher.py247
1 files changed, 0 insertions, 247 deletions
diff --git a/dispatcher.py b/dispatcher.py
deleted file mode 100644
index 56fb9f7..0000000
--- a/dispatcher.py
+++ /dev/null
@@ -1,247 +0,0 @@
-from glob import glob
-import os.path as pa
-from time import sleep, time
-import requests
-from requests.exceptions import ConnectionError
-from urllib import urlretrieve
-from uuid import uuid1
-import tarfile
-import itertools
-from fnmatch import fnmatch
-import os
-
-
-def get_values(filename):
- with open(filename) as f:
- for line in f:
- stripped = line.strip()
- if stripped:
- yield line.strip().split()
-
-
-class Dispatcher:
-
- def __init__(self, filename):
- self.followers_queue = {}
- self.lookups_queue = set()
- self.short_lookup_queue = set()
- self.current_followers = set()
- self.current_lookups = set()
- self.users = {}
- self.last_time = 0
- self.servers = {}
- self.files = [open(fname) for fname in glob("data/*.txt")]
- self.filename = filename
- self.update_servers(first=True)
-
- for fh in self.files:
- for line in fh:
- values = line.strip().split()
- self.add_user(*values[:3])
-
- for fname in glob("data/users/lookup*.txt"):
- self.aggregate_lookups(fname)
-
- for fname in glob("data/users/[0-9]*.txt"):
- self.aggregate_followers(fname)
-
- def __del__(self):
- for fh in self.files:
- fh.close()
-
- def add_user(self, user_id, user_name, followers_count):
- self.users[user_id] = user_name
- if int(followers_count) >= 5000:
- return
- if (not pa.isfile(pa.join("data", "users", user_id + ".txt"))
- and user_id not in self.current_followers):
- self.followers_queue[user_id] = (user_name, followers_count)
-
- def aggregate_followers(self, filename):
- basename = pa.splitext(pa.basename(filename))[0]
- try:
- self.current_followers.remove(basename)
- except KeyError:
- pass
- for values in get_values(filename):
- if (values[0] not in self.users
- and values[0] not in self.current_lookups):
- if len(values) == 2:
- self.short_lookup_queue.add(tuple(values[:2]))
- else:
- self.lookups_queue.add(values[0])
-
- def aggregate_lookups(self, filename):
- for values in get_values(filename):
- self.users[values[0]] = values[1]
- try:
- self.current_lookups.remove(values[0])
- except KeyError:
- pass
-
- def update_servers(self, first=False):
- ctime = int(time())
- if self.last_time >= ctime - 15:
- return
- else:
- self.last_time = ctime
-
- with open(self.filename) as f:
- for line in f:
- server = line.strip()
- self.servers[server] = {}
- try:
- self.servers[server] = requests.get(server
- + "/status").json()
- except ConnectionError:
- self.servers[server]["dead"] = True
- else:
- self.servers[server]["dead"] = False
- if int(self.servers[server]["done"]) > 0:
- self.fetch_archive(server)
- if first:
- requests.get(server + "/restart")
-
- def fetch_archive(self, server):
- filename = pa.join("data", "users", "archive-"
- + str(uuid1()) + ".tar.gz")
- urlretrieve(server + "/fetch", filename)
-
- with tarfile.open(filename, "r:gz") as tar:
- tar.extractall()
-
- def get_tar_filenames():
- with tarfile.open(filename, "r:gz") as tar:
- for tarinfo in tar:
- if tarinfo.isfile():
- yield tarinfo.name
-
- for fname in get_tar_filenames():
- if fnmatch(fname, "*data/users/lookup*.txt"):
- self.aggregate_lookups(fname)
-
- for fname in get_tar_filenames():
- if fnmatch(fname, "*data/users/[0-9]*.txt"):
- self.aggregate_followers(fname)
-
- os.remove(filename)
-
- def clear_followers(self):
- shorts = {key: int(value["short"])
- for key, value in self.servers.iteritems()
- if not value["dead"]}
- longs = {key: int(value["long"])
- for key, value in self.servers.iteritems()
- if not value["dead"]}
- while len(self.followers_queue) > 0:
- if not shorts:
- return
- key, value = self.followers_queue.popitem()
- if (pa.isfile(pa.join("data", "users", key + ".txt"))
- or key in self.current_followers):
- continue
- self.current_followers.add(key)
- if int(value[1]) > 1000:
- server = min(longs, key=lambda key: longs[key])
- try:
- requests.get(server + "/long", params={"id": key})
- except ConnectionError:
- del shorts[server]
- del longs[server]
- self.followers_queue[key] = value
- self.current_followers.remove(key)
- else:
- longs[server] += 1
- else:
- server = min(shorts, key=lambda key: shorts[key])
- try:
- requests.get(server + "/short",
- params={"id": key, "user_name": value[0]})
- except ConnectionError:
- del shorts[server]
- del longs[server]
- self.followers_queue[key] = value
- self.current_followers.remove(key)
- else:
- shorts[server] += 1
-
- def clear_lookups(self):
- lookups = {key: int(value["lookup"])
- for key, value in self.servers.iteritems()
- if not value["dead"]}
-
- while len(self.lookups_queue) > 100:
- if not lookups:
- return
- l = [self.lookups_queue.pop() for _ in xrange(100)]
- l = [e for e in l
- if e not in self.current_lookups and e not in self.users]
- for e in l:
- self.current_lookups.add(e)
- lstr = ",".join(l)
- if lstr:
- server = min(lookups, key=lambda key: lookups[key])
- try:
- requests.post(server + "/lookup", data={"list": lstr})
- except ConnectionError:
- del lookups[server]
- for e in l:
- self.lookups_queue.add(e)
- self.current_lookups.remove(e)
- else:
- lookups[server] += 1
-
- def clear_short_lookups(self):
- lookups = {key: int(value["lookup"])
- for key, value in self.servers.iteritems()
- if not value["dead"]}
-
- while len(self.short_lookup_queue) > 100:
- if not lookups:
- return
- l = [self.short_lookup_queue.pop() for _ in xrange(100)]
- l = [e for e in l
- if e[0] not in self.current_lookups
- and e[0] not in self.users]
- for e in l:
- self.current_lookups.add(e[0])
- lstr = ",".join(itertools.chain.from_iterable(l))
- if lstr:
- server = min(lookups, key=lambda key: lookups[key])
- try:
- requests.post(server + "/short_lookup",
- data={"list": lstr})
- except ConnectionError:
- del lookups[server]
- for e in l:
- self.short_lookup_queue.add(e)
- self.current_lookups.remove(e[0])
- else:
- lookups[server] += 1
-
- def run(self):
- while True:
- self.update_servers()
- for fh in self.files:
- line = fh.readline()
- if line:
- values = line.strip().split()
- self.add_user(*values[:3])
- break
- else:
- sleep(0.5)
- self.clear_followers()
- self.clear_lookups()
- self.clear_short_lookups()
- print len(self.current_followers), len(self.current_lookups),\
- len(self.followers_queue), len(self.lookups_queue)
-
-
-if __name__ == "__main__":
- import sys
- try:
- dispatcher = Dispatcher(sys.argv[1])
- except IndexError:
- print "{0} <server_file>".format(sys.argv[0])
- else:
- dispatcher.run()