diff --git a/scripts/update.py b/scripts/update.py index b94d02e..8bc48af 100644 --- a/scripts/update.py +++ b/scripts/update.py @@ -1,6 +1,5 @@ import asyncio import logging.config -import platform import re import subprocess from pathlib import Path diff --git a/twitter/__version__.py b/twitter/__version__.py index dccb1a4..a9b8684 100644 --- a/twitter/__version__.py +++ b/twitter/__version__.py @@ -1,5 +1,5 @@ __title__ = "twitter-api-client" __description__ = "Implementation of X/Twitter v1, v2, and GraphQL APIs." -__version__ = "0.10.15" +__version__ = "0.10.16" __author__ = "Trevor Hobenshield" __license__ = "MIT" \ No newline at end of file diff --git a/twitter/account.py b/twitter/account.py index 5a1de1d..3984146 100644 --- a/twitter/account.py +++ b/twitter/account.py @@ -102,7 +102,8 @@ class Account: variables['message']['text'] = {'text': text} res = self.gql('POST', Operation.useSendMessageMutation, variables) if find_key(res, 'dm_validation_failure_type'): - self.logger.debug(f"{RED}Failed to send DM(s) to {receivers}{RESET}") + if self.debug: + self.logger.debug(f"{RED}Failed to send DM(s) to {receivers}{RESET}") return res def tweet(self, text: str, *, media: any = None, **kwargs) -> dict: @@ -542,16 +543,19 @@ class Account: _headers = {b'content-type': b'multipart/form-data; boundary=----WebKitFormBoundary' + pad} r = self.session.post(url=url, headers=headers | _headers, params=params, content=data) except Exception as e: - self.logger.error(f'Failed to upload chunk, trying alternative method\n{e}') + if self.debug: + self.logger.error(f'Failed to upload chunk, trying alternative method\n{e}') try: files = {'media': chunk} r = self.session.post(url=url, headers=headers, params=params, files=files) except Exception as e: - self.logger.error(f'Failed to upload chunk\n{e}') + if self.debug: + self.logger.error(f'Failed to upload chunk\n{e}') return if r.status_code < 200 or r.status_code > 299: - self.logger.debug(f'{RED}{r.status_code} {r.text}{RESET}') + if self.debug: + self.logger.debug(f'{RED}{r.status_code} {r.text}{RESET}') i += 1 pbar.update(fp.tell() - pbar.n) @@ -561,7 +565,8 @@ class Account: params |= {'original_md5': hashlib.md5(file.read_bytes()).hexdigest()} r = self.session.post(url=url, headers=headers, params=params) if r.status_code == 400: - self.logger.debug(f'{RED}{r.status_code} {r.text}{RESET}') + if self.debug: + self.logger.debug(f'{RED}{r.status_code} {r.text}{RESET}') return # self.logger.debug(f'processing, please wait...') @@ -569,12 +574,14 @@ class Account: while processing_info: state = processing_info['state'] if error := processing_info.get("error"): - self.logger.debug(f'{RED}{error}{RESET}') + if self.debug: + self.logger.debug(f'{RED}{error}{RESET}') return if state == MEDIA_UPLOAD_SUCCEED: break if state == MEDIA_UPLOAD_FAIL: - self.logger.debug(f'{RED}{r.status_code} {r.text} {RESET}') + if self.debug: + self.logger.debug(f'{RED}{r.status_code} {r.text} {RESET}') return check_after_secs = processing_info.get('check_after_secs', random.randint(1, 5)) time.sleep(check_after_secs) diff --git a/twitter/constants.py b/twitter/constants.py index 5f147f2..96cd2be 100644 --- a/twitter/constants.py +++ b/twitter/constants.py @@ -69,6 +69,17 @@ ID_MAP = { 'Favoriters': '^user-\d+$' } +USER_AGENTS = [ + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.3', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/116.0', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.1901.20', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.3', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5.2 Safari/605.1.15', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/116.0', + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5.1 Safari/605.1.15', +] @dataclass class SearchCategory: diff --git a/twitter/login.py b/twitter/login.py index ff98fe0..0e35efb 100644 --- a/twitter/login.py +++ b/twitter/login.py @@ -1,11 +1,11 @@ +import random import sys from httpx import Client -from .constants import YELLOW, RED, BOLD, RESET +from .constants import YELLOW, RED, BOLD, RESET, USER_AGENTS from .util import find_key - def update_token(client: Client, key: str, url: str, **kwargs) -> Client: caller_name = sys._getframe(1).f_code.co_name try: @@ -158,7 +158,7 @@ def login(email: str, username: str, password: str, **kwargs) -> Client: headers={ 'authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA', 'content-type': 'application/json', - 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36', + 'user-agent': random.choice(USER_AGENTS), 'x-twitter-active-user': 'yes', 'x-twitter-client-language': 'en', }, diff --git a/twitter/scraper.py b/twitter/scraper.py index 13c7717..968a9e4 100644 --- a/twitter/scraper.py +++ b/twitter/scraper.py @@ -2,6 +2,8 @@ import asyncio import logging.config import math import platform +from functools import partial +from typing import Generator import websockets from httpx import AsyncClient, Limits, ReadTimeout, URL @@ -14,6 +16,7 @@ from .util import * try: if get_ipython().__class__.__name__ == 'ZMQInteractiveShell': import nest_asyncio + nest_asyncio.apply() except: ... @@ -21,6 +24,7 @@ except: if platform.system() != 'Windows': try: import uvloop + uvloop.install() except ImportError as e: ... @@ -239,60 +243,97 @@ class Scraper: """ return self._run(Operation.UserByRestId, user_ids, **kwargs) - def download_media(self, ids: list[int], photos: bool = True, videos: bool = True, chunk_size: int = 8192, stream: bool = False, out: str = 'media') -> None: + def download_media(self, ids: list[int], photos: bool = True, videos: bool = True, cards: bool = True, hq_img_variant: bool = True, video_thumb: bool = False, out: str = 'media', + metadata_out: str = 'media.json', **kwargs) -> dict: """ - Download media from tweets by tweet ids. + Download and extract media metadata from Tweets - @param ids: list of tweet ids containing media - @param photos: flag to include photos - @param videos: flag to include videos - @param chunk_size: chunk size for download - @params stream: flag to enable downloading raw stream - @return: None + @param ids: list of Tweet IDs + @param photos: download images + @param videos: download videos + @param cards: download cards + @param hq_img_variant: download highest quality image, options: {"orig", "4096x4096"} + @param video_thumb: download video thumbnails + @param out: output file for media + @param metadata_out: output file for media metadata + @return: media data """ - out = Path(out) - out.mkdir(parents=True, exist_ok=True) - tweets = self.tweets_by_id(ids) # todo: switch to batch method tweets_by_ids - urls = [] - for tweet in tweets: - tweet_id = find_key(tweet, 'id_str')[0] - url = f'https://twitter.com/i/status/{tweet_id}' - media = [y for x in find_key(tweet, 'media') for y in x] - if photos: - photo_urls = list({u for m in media if 'ext_tw_video_thumb' not in (u := m['media_url_https'])}) - [urls.append([url, photo]) for photo in photo_urls] - if videos: - video_urls = [x['variants'] for m in media if (x := m.get('video_info'))] - hq_videos = {sorted(v, key=lambda d: d.get('bitrate', 0))[-1]['url'] for v in video_urls} - [urls.append([url, video]) for video in hq_videos] - async def process(): - async with AsyncClient(headers=self.session.headers, cookies=self.session.cookies) as client: - tasks = (download(client, x, y, stream) for x, y in urls) - if self.pbar: - return await tqdm_asyncio.gather(*tasks, desc='Downloading media') - return await asyncio.gather(*tasks) + async def process(fns: Generator) -> list: + limits = { + 'max_connections': kwargs.pop('max_connections', 1000), + 'max_keepalive_connections': kwargs.pop('max_keepalive_connections', None), + 'keepalive_expiry': kwargs.pop('keepalive_expiry', 5.0), + } + headers = {'user-agent': random.choice(USER_AGENTS)} + async with AsyncClient(limits=Limits(**limits), headers=headers, http2=True, verify=False, timeout=60, follow_redirects=True) as client: + return await tqdm_asyncio.gather(*(fn(client=client) for fn in fns), desc='Downloading Media') - async def download(client: AsyncClient, post_url: str, cdn_url: str, stream: bool = False) -> None: - try: - name = urlsplit(post_url).path.replace('/', '_')[1:] + def download(urls: list[tuple], out: str) -> Generator: + out = Path(out) + out.mkdir(parents=True, exist_ok=True) + chunk_size = kwargs.pop('chunk_size', None) + + async def get(client: AsyncClient, url: str): + tid, cdn_url = url ext = urlsplit(cdn_url).path.split('/')[-1] - fname = out / f'{name}_{ext}' - if stream: - async with aiofiles.open(fname, 'wb') as fp: - async with client.stream('GET', cdn_url) as r: - async for chunk in r.aiter_raw(chunk_size): - await fp.write(chunk) - else: - r = await client.get(cdn_url) - async with aiofiles.open(fname, 'wb') as fp: + fname = out / f'{tid}_{ext}' + async with aiofiles.open(fname, 'wb') as fp: + async with client.stream('GET', cdn_url) as r: async for chunk in r.aiter_raw(chunk_size): await fp.write(chunk) - except Exception as e: - self.logger.error(f'[{RED}error{RESET}] Failed to download media: {post_url} {e}') + return (partial(get, url=u) for u in urls) - asyncio.run(process()) + tweets = self.tweets_by_ids(ids, **kwargs) + media = {} + for data in tweets: + for tweet in data.get('data', {}).get('tweetResult', []): + if _id := tweet.get('result', {}).get('rest_id'): + + date = tweet.get('result', {}).get('legacy', {}).get('created_at', '') + uid = tweet.get('result', {}).get('legacy', {}).get('user_id_str', '') + media[_id] = {'date': date, 'uid': uid, 'img': set(), 'video': {'thumb': set(), 'video_info': {}, 'hq': set()}, 'card': []} + + for _media in (y for x in find_key(tweet['result'], 'media') for y in x if isinstance(x, list)): + if videos: + if vinfo := _media.get('video_info'): + hq = sorted(vinfo.get('variants', []), key=lambda x: -x.get('bitrate', 0))[0]['url'] + media[_id]['video']['video_info'] |= vinfo + media[_id]['video']['hq'].add(hq) + + if video_thumb: + if url := _media.get('media_url_https', ''): + media[_id]['video']['thumb'].add(url) + + if photos: + if (url := _media.get('media_url_https', '')) and "_video_thumb" not in url: + if hq_img_variant: + url = f'{url}?name=orig' + media[_id]['img'].add(url) + if cards: + if card := tweet.get('result', {}).get('card', {}).get('legacy', {}): + media[_id]['card'].extend(card.get('binding_values', [])) + if metadata_out: + media = set2list(media) + metadata_out = Path(metadata_out) + metadata_out.parent.mkdir(parents=True, exist_ok=True) # if user specifies subdir + metadata_out.write_bytes(orjson.dumps(media)) + + res = [] + for k, v in media.items(): + tmp = [] + if photos: + tmp.extend(v['img']) + if videos: + tmp.extend(v['video']['hq']) + if video_thumb: + tmp.extend(v['video']['thumb']) + if cards: + tmp.extend(parse_card_media(v['card'])) + res.extend([(k, m) for m in tmp]) + asyncio.run(process(download(res, out))) + return media def trends(self, utc: list[str] = None) -> dict: """ @@ -309,7 +350,8 @@ class Scraper: trends = find_key(r.json(), 'item') return {t['content']['trend']['name']: t for t in trends} except Exception as e: - self.logger.error(f'[{RED}error{RESET}] Failed to get trends\n{e}') + if self.debug: + self.logger.error(f'[{RED}error{RESET}] Failed to get trends\n{e}') async def process(): url = set_qs('https://twitter.com/i/api/2/guide.json', trending_params) @@ -393,7 +435,8 @@ class Scraper: r = await client.get(url, params=params) return r.json() except Exception as e: - self.logger.error(f'stream not available for playback\n{e}') + if self.debug: + self.logger.error(f'stream not available for playback\n{e}') async def _init_chat(self, client: AsyncClient, chat_token: str) -> dict: payload = {'chat_token': chat_token} # stream['chatToken'] @@ -422,7 +465,8 @@ class Scraper: data = r.json() res.append(data) except ReadTimeout as e: - self.logger.debug(f'End of chat data\n{e}') + if self.debug: + self.logger.debug(f'End of chat data\n{e}') break parsed = [] @@ -433,7 +477,8 @@ class Scraper: msg['payload'] = orjson.loads(msg.get('payload', '{}')) msg['payload']['body'] = orjson.loads(msg['payload'].get('body')) except Exception as e: - self.logger.error(f'Failed to parse chat message\n{e}') + if self.debug: + self.logger.error(f'Failed to parse chat message\n{e}') parsed.extend(messages) return parsed @@ -451,7 +496,8 @@ class Scraper: url = '/'.join(location.split('/')[:-1]) return [f'{url}/{chunk}' for chunk in chunks] except Exception as e: - self.logger.error(f'Failed to get chunks\n{e}') + if self.debug: + self.logger.error(f'Failed to get chunks\n{e}') def _get_chat_data(self, keys: list[dict]) -> list[dict]: async def get(c: AsyncClient, key: dict) -> dict: @@ -527,7 +573,8 @@ class Scraper: keys, qid, name = operation # stay within rate-limits if (l := len(queries)) > MAX_ENDPOINT_LIMIT: - self.logger.warning(f'Got {l} queries, truncating to first 500.') + if self.debug: + self.logger.warning(f'Got {l} queries, truncating to first 500.') queries = list(queries)[:MAX_ENDPOINT_LIMIT] if all(isinstance(q, dict) for q in queries): @@ -578,10 +625,12 @@ class Scraper: initial_data = r.json() res = [r] # ids = get_ids(initial_data, operation) # todo - ids = set(find_key(initial_data, 'rest_id')) + ids = {x for x in find_key(initial_data, 'rest_id') if x[0].isnumeric()} + cursor = get_cursor(initial_data) except Exception as e: - self.logger.error('Failed to get initial pagination data', e) + if self.debug: + self.logger.error('Failed to get initial pagination data', e) return while (dups < DUP_LIMIT) and cursor: prev_len = len(ids) @@ -591,11 +640,13 @@ class Scraper: r = await self._query(client, operation, cursor=cursor, **kwargs) data = r.json() except Exception as e: - self.logger.error(f'Failed to get pagination data\n{e}') + if self.debug: + self.logger.error(f'Failed to get pagination data\n{e}') return cursor = get_cursor(data) # ids |= get_ids(data, operation) # todo - ids |= set(find_key(data, 'rest_id')) + ids |= {x for x in find_key(data, 'rest_id') if x[0].isnumeric()} + if self.debug: self.logger.debug(f'Unique results: {len(ids)}\tcursor: {cursor}') if prev_len == len(ids): @@ -732,7 +783,8 @@ class Scraper: return {"url": data['source']['location'], "room": room} except Exception as e: room = space['data']['audioSpace']['metadata']['rest_id'] - self.logger.error(f'Failed to get stream info for https://twitter.com/i/spaces/{room}\n{e}') + if self.debug: + self.logger.error(f'Failed to get stream info for https://twitter.com/i/spaces/{room}\n{e}') async def get_chunks(client: AsyncClient, url: str) -> list[str]: try: @@ -745,7 +797,8 @@ class Scraper: base = '/'.join(str(url).split('/')[:-1]) return [f'{base}/{c}' for c in parse_chunks(r.text)] except Exception as e: - self.logger.error(f'Failed to get chunks\n{e}') + if self.debug: + self.logger.error(f'Failed to get chunks\n{e}') async def poll_space(client: AsyncClient, space: dict) -> dict | None: curr = 0 @@ -766,11 +819,13 @@ class Scraper: all_chunks |= new_chunks for c in sort_chunks(new_chunks): try: - self.logger.debug(f"write: chunk [{chunk_idx(c)}]\t{c}") + if self.debug: + self.logger.debug(f"write: chunk [{chunk_idx(c)}]\t{c}") r = await client.get(c) await fp.write(r.content) except Exception as e: - self.logger.error(f'Failed to write chunk {c}\n{e}') + if self.debug: + self.logger.error(f'Failed to write chunk {c}\n{e}') curr = 0 if new_chunks else curr + 1 # wait for new chunks. dynamic playlist is updated every 2-3 seconds await asyncio.sleep(random.random() + 1.5) diff --git a/twitter/search.py b/twitter/search.py index 9e406ea..16f996c 100644 --- a/twitter/search.py +++ b/twitter/search.py @@ -100,17 +100,20 @@ class Search: data, entries, cursor = await fn() if errors := data.get('errors'): for e in errors: - self.logger.warning(f'{YELLOW}{e.get("message")}{RESET}') + if self.debug: + self.logger.warning(f'{YELLOW}{e.get("message")}{RESET}') return [], [], '' ids = set(find_key(data, 'entryId')) if len(ids) >= 2: return data, entries, cursor except Exception as e: if i == retries: - self.logger.debug(f'Max retries exceeded\n{e}') + if self.debug: + self.logger.debug(f'Max retries exceeded\n{e}') return t = 2 ** i + random.random() - self.logger.debug(f'Retrying in {f"{t:.2f}"} seconds\t\t{e}') + if self.debug: + self.logger.debug(f'Retrying in {f"{t:.2f}"} seconds\t\t{e}') await asyncio.sleep(t) def _init_logger(self, **kwargs) -> Logger: diff --git a/twitter/util.py b/twitter/util.py index 08d997a..92deec6 100644 --- a/twitter/util.py +++ b/twitter/util.py @@ -10,13 +10,13 @@ import orjson from aiofiles.os import makedirs from httpx import Response, Client -from .constants import GREEN, MAGENTA, RED, RESET, ID_MAP, MAX_GQL_CHAR_LIMIT +from .constants import GREEN, MAGENTA, RED, RESET, ID_MAP, MAX_GQL_CHAR_LIMIT, USER_AGENTS def init_session(): client = Client(headers={ 'authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA', - 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36', + 'user-agent': random.choice(USER_AGENTS), }, follow_redirects=True) r = client.post('https://api.twitter.com/1.1/guest/activate.json').json() client.headers.update({ @@ -37,7 +37,7 @@ def batch_ids(ids: list[int | str], char_limit: int = MAX_GQL_CHAR_LIMIT) -> lis batch.append(x) length += len(x) res.append(batch) if batch else ... - print(f'Batched {sum(map(len, res))} ids into {len(res)} requests') + # print(f'Batched {sum(map(len, res))} ids into {len(res)} requests') return res @@ -240,6 +240,24 @@ def get_code(cls, retries=5) -> str | None: time.sleep(t) +def parse_card_media(cards): + res = [] + for c in cards: + img = c.get('value', {}).get('image_value', {}) + if c.get('key') == 'photo_image_full_size_original': + url = img.get('url') + res.append([url, img.get('width', 0) * img.get('height', 0)]) + return [t[0] for t in sorted(res, key=lambda x: -x[1])] + + +def set2list(d): + if isinstance(d, dict): + return {k: set2list(v) for k, v in d.items()} + if isinstance(d, set): + return list(d) + return d + + # todo: to remove def get_ids(data: list | dict, operation: tuple) -> set: expr = ID_MAP[operation[-1]]