diff options
| -rw-r--r-- | Makefile | 13 | ||||
| -rw-r--r-- | api.py | 101 | ||||
| -rw-r--r-- | api_accounts.txt | 2 | ||||
| -rw-r--r-- | concepts.txt | 17 | ||||
| -rw-r--r-- | dispatcher.py | 247 | ||||
| -rw-r--r-- | main.py | 182 | ||||
| -rw-r--r-- | scraper.py | 86 | ||||
| -rw-r--r-- | scraping_accounts.txt | 2 | ||||
| -rw-r--r-- | servers.txt | 1 | ||||
| -rw-r--r-- | stream.py | 62 |
10 files changed, 713 insertions, 0 deletions
diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..0209047 --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ +PORT=8111 +PYTHON=python2 +DATADIR=data/users + +$(DATADIR): + mkdir data + mkdir data/users + +run: | $(DATADIR) + $(PYTHON) main.py $(PORT) 2> errors.log + +archive: | $(DATADIR) + tar --force-local -czf archive-$$(date -Is).tar.gz data/ @@ -0,0 +1,101 @@ +from tweepy import API, OAuthHandler +from tweepy import cursor +from bs4 import BeautifulSoup + +import os.path +import uuid +from time import time, sleep +from urllib import urlopen + + +class RequestHandler: + + def __init__(self, *args): + auth = OAuthHandler(*args[0:2]) + auth.set_access_token(*args[2:]) + self.api = API(auth) + self.state = {} + limits = self.api.rate_limit_status() + self.state["followers"] = limits["resources"]["followers"]["/followers/ids"] + self.state["lookup"] = limits["resources"]["users"]["/users/lookup"] + + def __get_followers(self, user_id): + pages = cursor.Cursor(self.api.followers_ids, id=user_id).pages(1) + for page in pages: + for follower in page: + yield follower + + def get_followers(self, user_id): + filename = os.path.join("data", "users", user_id + ".txt") + if os.path.isfile(filename): + return filename + l = list(self.__get_followers(user_id)) + with open(filename, "w") as f: + for fid in l: + f.write(str(fid) + "\n") + for key, value in self.api.last_response.getheaders(): + if key.startswith("x-rate-limit"): + self.state["followers"][key.split("-")[-1]] = int(value) + return filename + + def __lookup(self, users_list): + for user in self.api.lookup_users(users_list): + yield user + + def lookup(self, users_list): + uid = uuid.uuid1() + filename = os.path.join("data", "users", "lookup-" + str(uid) + ".txt") + l = list(self.__lookup(users_list)) + with open(filename, "w") as f: + for user in l: + output = " ".join([str(user.id), user.screen_name, + str(user.followers_count), + str(user.friends_count)]) + f.write(output + "\n") + for key, value in self.api.last_response.getheaders(): + if key.startswith("x-rate-limit"): + self.state["lookup"][key.split("-")[-1]] = int(value) + return filename + + def get_profile(self, user_id, username): + fh = urlopen("https://twitter.com/{0}".format(username)) + soup = BeautifulSoup(fh) + ul = soup.find("ul", class_="js-mini-profile-stats") + following, followers = [li.strong.string + for li in ul.find_all("li")[1:]] + return user_id, username, followers, following + + def short_lookup(self, users_list): + uid = uuid.uuid1() + filename = os.path.join("data", "users", "lookup-" + str(uid) + ".txt") + + def get_output(): + for user_id, username in users_list: + output = " ".join(map(str, self.get_profile(user_id, + username))) + yield output + sleep(0.5) + + to_write = list(get_output()) + with open(filename, "w") as f: + f.write("\n".join(to_write)) + + return filename + + def ready(self, method): + now = int(time()) + if (int(self.state[method]["remaining"]) > 0 + or int(self.state[method]["reset"]) < now): + return True + else: + return False + + +if __name__ == "__main__": + credentials = open("api_accounts.txt").readline().strip().split() + handler = RequestHandler(*credentials[2:]) + # if handler.ready("lookup"): + # handler.lookup(["304224106"]) + # if handler.ready("followers"): + # handler.lookup("304224106") + print handler.short_lookup([("000", "thibauthorel")]) diff --git a/api_accounts.txt b/api_accounts.txt new file mode 100644 index 0000000..263f2e8 --- /dev/null +++ b/api_accounts.txt @@ -0,0 +1,2 @@ +thibaut.horel@gmail.com Dlmatc06 GT3ILinlqcuChZY2ueOb1Q 9Jx9WGyfNea35X2kYCAN8hh9WkZl6wD7b4yXkY 2291723059-dvaHVGA50FYgDtxxZZQoBU0MQYysdaYOFIyOeLa 70GdBOKCIQWliX1hllfgmek2vEvrnKBqm0bBfApbP38TO +zaran.krleza+1@gmail.com i6rkXWj78 Fle9xRwFyXO3SV7zR7KDg 0rAzjUo6yyx0DtHR6EvIQPenynJKmLKgPvyGRqj4w 2304251221-ztXyr6HFBOuDbPiWqFQT3wWAQfW6iEw7RoQXrwW 6xf5T89H4wneiiSskuRtL8GWHhK0g84CNmPdCeAOiXCP8 diff --git a/concepts.txt b/concepts.txt new file mode 100644 index 0000000..0d3a8a7 --- /dev/null +++ b/concepts.txt @@ -0,0 +1,17 @@ +beyonce +katyperry +gameofthrones +walkingdead +bigbangtheory +ipad +gameinsight +climatechange +barackobama +disney +nonprofit +coffee +starbucks +pulpfiction +lordoftherings +harvard +glee diff --git a/dispatcher.py b/dispatcher.py new file mode 100644 index 0000000..56fb9f7 --- /dev/null +++ b/dispatcher.py @@ -0,0 +1,247 @@ +from glob import glob +import os.path as pa +from time import sleep, time +import requests +from requests.exceptions import ConnectionError +from urllib import urlretrieve +from uuid import uuid1 +import tarfile +import itertools +from fnmatch import fnmatch +import os + + +def get_values(filename): + with open(filename) as f: + for line in f: + stripped = line.strip() + if stripped: + yield line.strip().split() + + +class Dispatcher: + + def __init__(self, filename): + self.followers_queue = {} + self.lookups_queue = set() + self.short_lookup_queue = set() + self.current_followers = set() + self.current_lookups = set() + self.users = {} + self.last_time = 0 + self.servers = {} + self.files = [open(fname) for fname in glob("data/*.txt")] + self.filename = filename + self.update_servers(first=True) + + for fh in self.files: + for line in fh: + values = line.strip().split() + self.add_user(*values[:3]) + + for fname in glob("data/users/lookup*.txt"): + self.aggregate_lookups(fname) + + for fname in glob("data/users/[0-9]*.txt"): + self.aggregate_followers(fname) + + def __del__(self): + for fh in self.files: + fh.close() + + def add_user(self, user_id, user_name, followers_count): + self.users[user_id] = user_name + if int(followers_count) >= 5000: + return + if (not pa.isfile(pa.join("data", "users", user_id + ".txt")) + and user_id not in self.current_followers): + self.followers_queue[user_id] = (user_name, followers_count) + + def aggregate_followers(self, filename): + basename = pa.splitext(pa.basename(filename))[0] + try: + self.current_followers.remove(basename) + except KeyError: + pass + for values in get_values(filename): + if (values[0] not in self.users + and values[0] not in self.current_lookups): + if len(values) == 2: + self.short_lookup_queue.add(tuple(values[:2])) + else: + self.lookups_queue.add(values[0]) + + def aggregate_lookups(self, filename): + for values in get_values(filename): + self.users[values[0]] = values[1] + try: + self.current_lookups.remove(values[0]) + except KeyError: + pass + + def update_servers(self, first=False): + ctime = int(time()) + if self.last_time >= ctime - 15: + return + else: + self.last_time = ctime + + with open(self.filename) as f: + for line in f: + server = line.strip() + self.servers[server] = {} + try: + self.servers[server] = requests.get(server + + "/status").json() + except ConnectionError: + self.servers[server]["dead"] = True + else: + self.servers[server]["dead"] = False + if int(self.servers[server]["done"]) > 0: + self.fetch_archive(server) + if first: + requests.get(server + "/restart") + + def fetch_archive(self, server): + filename = pa.join("data", "users", "archive-" + + str(uuid1()) + ".tar.gz") + urlretrieve(server + "/fetch", filename) + + with tarfile.open(filename, "r:gz") as tar: + tar.extractall() + + def get_tar_filenames(): + with tarfile.open(filename, "r:gz") as tar: + for tarinfo in tar: + if tarinfo.isfile(): + yield tarinfo.name + + for fname in get_tar_filenames(): + if fnmatch(fname, "*data/users/lookup*.txt"): + self.aggregate_lookups(fname) + + for fname in get_tar_filenames(): + if fnmatch(fname, "*data/users/[0-9]*.txt"): + self.aggregate_followers(fname) + + os.remove(filename) + + def clear_followers(self): + shorts = {key: int(value["short"]) + for key, value in self.servers.iteritems() + if not value["dead"]} + longs = {key: int(value["long"]) + for key, value in self.servers.iteritems() + if not value["dead"]} + while len(self.followers_queue) > 0: + if not shorts: + return + key, value = self.followers_queue.popitem() + if (pa.isfile(pa.join("data", "users", key + ".txt")) + or key in self.current_followers): + continue + self.current_followers.add(key) + if int(value[1]) > 1000: + server = min(longs, key=lambda key: longs[key]) + try: + requests.get(server + "/long", params={"id": key}) + except ConnectionError: + del shorts[server] + del longs[server] + self.followers_queue[key] = value + self.current_followers.remove(key) + else: + longs[server] += 1 + else: + server = min(shorts, key=lambda key: shorts[key]) + try: + requests.get(server + "/short", + params={"id": key, "user_name": value[0]}) + except ConnectionError: + del shorts[server] + del longs[server] + self.followers_queue[key] = value + self.current_followers.remove(key) + else: + shorts[server] += 1 + + def clear_lookups(self): + lookups = {key: int(value["lookup"]) + for key, value in self.servers.iteritems() + if not value["dead"]} + + while len(self.lookups_queue) > 100: + if not lookups: + return + l = [self.lookups_queue.pop() for _ in xrange(100)] + l = [e for e in l + if e not in self.current_lookups and e not in self.users] + for e in l: + self.current_lookups.add(e) + lstr = ",".join(l) + if lstr: + server = min(lookups, key=lambda key: lookups[key]) + try: + requests.post(server + "/lookup", data={"list": lstr}) + except ConnectionError: + del lookups[server] + for e in l: + self.lookups_queue.add(e) + self.current_lookups.remove(e) + else: + lookups[server] += 1 + + def clear_short_lookups(self): + lookups = {key: int(value["lookup"]) + for key, value in self.servers.iteritems() + if not value["dead"]} + + while len(self.short_lookup_queue) > 100: + if not lookups: + return + l = [self.short_lookup_queue.pop() for _ in xrange(100)] + l = [e for e in l + if e[0] not in self.current_lookups + and e[0] not in self.users] + for e in l: + self.current_lookups.add(e[0]) + lstr = ",".join(itertools.chain.from_iterable(l)) + if lstr: + server = min(lookups, key=lambda key: lookups[key]) + try: + requests.post(server + "/short_lookup", + data={"list": lstr}) + except ConnectionError: + del lookups[server] + for e in l: + self.short_lookup_queue.add(e) + self.current_lookups.remove(e[0]) + else: + lookups[server] += 1 + + def run(self): + while True: + self.update_servers() + for fh in self.files: + line = fh.readline() + if line: + values = line.strip().split() + self.add_user(*values[:3]) + break + else: + sleep(0.5) + self.clear_followers() + self.clear_lookups() + self.clear_short_lookups() + print len(self.current_followers), len(self.current_lookups),\ + len(self.followers_queue), len(self.lookups_queue) + + +if __name__ == "__main__": + import sys + try: + dispatcher = Dispatcher(sys.argv[1]) + except IndexError: + print "{0} <server_file>".format(sys.argv[0]) + else: + dispatcher.run() @@ -0,0 +1,182 @@ +from bottle import route, run, request, static_file + +from scraper import Driver +from api import RequestHandler + +from multiprocessing import Process, Queue +from Queue import Empty +from time import sleep +from json import dumps +from uuid import uuid1 +import tarfile +import os.path +import os +from glob import glob + + +long_queue = Queue() +short_queue = Queue() +lookup_queue = Queue() +short_lookup_queue = Queue() +processes = [] +done_queue = Queue() + + +def start(): + global long_queue, short_queue, lookup_queue, done_queue, processes,\ + short_lookup_queue + processes = [] + long_queue = Queue() + short_queue = Queue() + short_lookup_queue = Queue() + lookup_queue = Queue() + with open("api_accounts.txt") as f: + for line in f: + credentials = line.strip().split()[2:] + handler = RequestHandler(*credentials) + p = Process(target=api_target, args=(handler, long_queue, + short_queue, + lookup_queue, done_queue)) + processes.append(p) + p.daemon = True + p.start() + + with open("scraping_accounts.txt") as f: + for line in f: + credentials = line.strip().split()[:2] + driver = Driver(*credentials) + p = Process(target=scraper_target, args=(driver, short_queue, + done_queue)) + processes.append(p) + p.daemon = True + p.start() + + +@route('/short_lookup', method='POST') +def short_lookup(): + query_list = request.forms.list.split(",") + user_list = zip(*[iter(query_list)] * 2) # this is dark magic + short_lookup_queue.put(user_list) + + +@route('/restart') +def restart(): + global processes + for p in processes: + p.terminate() + start() + + +@route('/long') +def long(): + user_id = request.query.id + long_queue.put(user_id) + + +@route('/short') +def short(): + user_id = request.query.id + user_name = request.query.user_name + short_queue.put((user_id, user_name)) + + +@route('/lookup', method='POST') +def lookup(): + id_list = request.forms.list.split(",") + lookup_queue.put(id_list) + + +@route('/status') +def status(): + answer_dict = { + "long": long_queue.qsize(), + "short": short_queue.qsize(), + "lookup": lookup_queue.qsize(), + "short_lookup": short_lookup_queue.qsize(), + "done": done_queue.qsize(), + "processes": len([p for p in processes if p.is_alive()]), + "users": len(glob("data/users/[0-9]*.txt")), + "lookups": len(glob("data/users/lookup*.txt")) + } + return dumps(answer_dict) + + +@route('/fetch') +def fetch(): + for filename in glob("data/users/*.tar.gz"): + os.remove(filename) + + def get_filenames(): + try: + while True: + yield done_queue.get(False) + except Empty: + pass + + filename = os.path.join("data", "users", "archive-" + + str(uuid1()) + ".tar.gz") + with tarfile.open(filename, "w:gz") as tar: + for name in get_filenames(): + tar.add(name) + return static_file(filename, root=".") + + +def scraper_target(driver, short_queue, done_queue): + while True: + try: + user_id, user_name = short_queue.get(False) + except Empty: + pass + else: + filename = driver.get_followers(user_id, user_name) + done_queue.put(filename) + finally: + sleep(0.5) + + +def api_target(handler, long_queue, short_queue, lookup_queue, done_queue): + while True: + if handler.ready("followers"): + try: + user_id = long_queue.get(False) + except Empty: + try: + user_id = short_queue.get(False)[0] + except Empty: + pass + else: + filename = handler.get_followers(user_id) + done_queue.put(filename) + continue + else: + filename = handler.get_followers(user_id) + done_queue.put(filename) + continue + if handler.ready("lookup"): + try: + users_list = lookup_queue.get(False) + except Empty: + try: + user_list = short_lookup_queue.get(False) + except Empty: + pass + else: + filename = handler.lookup(user[0] for user in user_list) + done_queue.put(filename) + else: + filename = handler.lookup(users_list) + done_queue.put(filename) + else: + try: + user_list = short_lookup_queue.get(False) + except Empty: + pass + else: + filename = handler.short_lookup(user_list) + done_queue.put(filename) + + +if __name__ == "__main__": + import sys + start() + run(host="0.0.0.0", port=int(sys.argv[1])) diff --git a/scraper.py b/scraper.py new file mode 100644 index 0000000..ee7dd8f --- /dev/null +++ b/scraper.py @@ -0,0 +1,86 @@ +from selenium import webdriver +from selenium.webdriver.support.wait import WebDriverWait +from selenium.common.exceptions import TimeoutException,\ + ElementNotVisibleException, NoSuchElementException +from bs4 import BeautifulSoup, Tag + +import os.path + + +class Driver: + + def __init__(self, username, password): + self.driver = webdriver.PhantomJS() + self.username = username + self.password = password + self.__connect() + + def __ajax_complete(self): + return 0 == self.driver.execute_script("return jQuery.active") + + def __connect(self): + driver = self.driver + driver.get("http://twitter.com") + driver.find_element_by_id("signin-email").send_keys(self.username) + elem = driver.find_element_by_id("signin-password") + elem.send_keys(self.password) + elem.submit() + + def __get_followers(self, username): + driver = self.driver + driver.get("https://twitter.com/{0}/followers".format(username)) + try: + footer = driver.find_element_by_class_name("timeline-end") + except NoSuchElementException: + return + + while True: + try: + if "has-more-items" not in footer.get_attribute("class"): + break + footer.click() + try: + WebDriverWait(driver, + 5).until(lambda x: self.__ajax_complete(), + "Timeout waiting for " + "ajax to return") + except TimeoutException: + break + except (NoSuchElementException, ElementNotVisibleException): + break + + try: + fws = driver.find_element_by_id("stream-items-id") + except NoSuchElementException: + return + + soup = BeautifulSoup(fws.get_attribute("outerHTML")) + for follower in soup.ol: + if type(follower) == Tag: + div = follower.div + user_id = div["data-user-id"] + screen_name = div["data-screen-name"] + yield user_id, screen_name + + def get_followers(self, user_id, username): + filename = os.path.join("data", "users", user_id + ".txt") + if os.path.isfile(filename): + return filename + l = list(self.__get_followers(username)) + with open(filename, "w") as f: + for (fid, fname) in l: + f.write(fid + " " + fname + "\n") + return filename + + def process(self, filename): + with open(filename) as f: + for line in f: + values = line.strip().split() + self.get_followers(*values[:2]) + + +if __name__ == "__main__": + credentials = open("scraping_accounts.txt").readline().strip().split() + driver = Driver(*credentials[:2]) + # driver.get_followers("23302126", "flipper509") + print driver.get_profile(100, "thibauthorel") diff --git a/scraping_accounts.txt b/scraping_accounts.txt new file mode 100644 index 0000000..43b586f --- /dev/null +++ b/scraping_accounts.txt @@ -0,0 +1,2 @@ +zaran.krleza+2@gmail.com thohLa1ahy8 +zaran.krleza+3@gmail.com ohsh2Dah diff --git a/servers.txt b/servers.txt new file mode 100644 index 0000000..b30ec14 --- /dev/null +++ b/servers.txt @@ -0,0 +1 @@ +http://localhost:8111 diff --git a/stream.py b/stream.py new file mode 100644 index 0000000..71cf615 --- /dev/null +++ b/stream.py @@ -0,0 +1,62 @@ +from tweepy import StreamListener, OAuthHandler, Stream + +from itertools import chain +from datetime import datetime +import sys +import os + + +class Listener(StreamListener): + + def __init__(self, *args, **kwargs): + copy = kwargs.copy() + del copy["concepts"] + super(Listener, self).__init__(*args, **copy) + date = datetime.now().replace(microsecond=0).isoformat() + self.fhandlers = {concept: open(concept + "_{0}.txt".format(date), "w") + for concept in kwargs["concepts"]} + + def __del__(self, *args, **kwargs): + super(Listener, self).__init__(*args, **kwargs) + for fh in self.fhandlers.itervalues(): + fh.close() + + def get_concepts(self, entities): + hashtags = (hashtag["text"].lower() + for hashtag in entities["hashtags"]) + users = (user["screen_name"].lower() + for user in entities["user_mentions"]) + return set(chain(hashtags, users)) + + def on_status(self, tweet): + concepts = self.get_concepts(tweet.entities) + output = " ".join([str(tweet.user.id), tweet.user.screen_name, + str(tweet.user.followers_count), + str(tweet.user.friends_count), + str(tweet.user.verified), + tweet.created_at.isoformat()]) + for concept in concepts: + if concept in self.fhandlers: + fh = self.fhandlers[concept] + fh.write(output + "\n") + fh.flush() + + +def process(filename, cred_file): + with open(filename) as f: + concepts = [line.strip() for line in f] + credentials = open(cred_file).readline().strip().split() + os.chdir("data") + entities = [("#" + concept, "@" + concept) for concept in concepts] + track = chain.from_iterable(entities) + auth = OAuthHandler(*credentials[2:4]) + auth.set_access_token(*credentials[4:]) + listener = Listener(concepts=concepts) + stream = Stream(auth, listener) + stream.filter(track=track) + +if __name__ == '__main__': + try: + process(sys.argv[1], sys.argv[2]) + except IndexError: + print "{0} <concept_file> <credentials_file>".format(sys.argv[0]) |
