fix logger, update media downloader

This commit is contained in:
Trevor Hobenshield
2023-12-10 18:36:24 -08:00
parent e4b8d82648
commit e0b5795e04
8 changed files with 169 additions and 76 deletions

View File

@@ -1,6 +1,5 @@
import asyncio
import logging.config
import platform
import re
import subprocess
from pathlib import Path

View File

@@ -1,5 +1,5 @@
__title__ = "twitter-api-client"
__description__ = "Implementation of X/Twitter v1, v2, and GraphQL APIs."
__version__ = "0.10.15"
__version__ = "0.10.16"
__author__ = "Trevor Hobenshield"
__license__ = "MIT"

View File

@@ -102,7 +102,8 @@ class Account:
variables['message']['text'] = {'text': text}
res = self.gql('POST', Operation.useSendMessageMutation, variables)
if find_key(res, 'dm_validation_failure_type'):
self.logger.debug(f"{RED}Failed to send DM(s) to {receivers}{RESET}")
if self.debug:
self.logger.debug(f"{RED}Failed to send DM(s) to {receivers}{RESET}")
return res
def tweet(self, text: str, *, media: any = None, **kwargs) -> dict:
@@ -542,16 +543,19 @@ class Account:
_headers = {b'content-type': b'multipart/form-data; boundary=----WebKitFormBoundary' + pad}
r = self.session.post(url=url, headers=headers | _headers, params=params, content=data)
except Exception as e:
self.logger.error(f'Failed to upload chunk, trying alternative method\n{e}')
if self.debug:
self.logger.error(f'Failed to upload chunk, trying alternative method\n{e}')
try:
files = {'media': chunk}
r = self.session.post(url=url, headers=headers, params=params, files=files)
except Exception as e:
self.logger.error(f'Failed to upload chunk\n{e}')
if self.debug:
self.logger.error(f'Failed to upload chunk\n{e}')
return
if r.status_code < 200 or r.status_code > 299:
self.logger.debug(f'{RED}{r.status_code} {r.text}{RESET}')
if self.debug:
self.logger.debug(f'{RED}{r.status_code} {r.text}{RESET}')
i += 1
pbar.update(fp.tell() - pbar.n)
@@ -561,7 +565,8 @@ class Account:
params |= {'original_md5': hashlib.md5(file.read_bytes()).hexdigest()}
r = self.session.post(url=url, headers=headers, params=params)
if r.status_code == 400:
self.logger.debug(f'{RED}{r.status_code} {r.text}{RESET}')
if self.debug:
self.logger.debug(f'{RED}{r.status_code} {r.text}{RESET}')
return
# self.logger.debug(f'processing, please wait...')
@@ -569,12 +574,14 @@ class Account:
while processing_info:
state = processing_info['state']
if error := processing_info.get("error"):
self.logger.debug(f'{RED}{error}{RESET}')
if self.debug:
self.logger.debug(f'{RED}{error}{RESET}')
return
if state == MEDIA_UPLOAD_SUCCEED:
break
if state == MEDIA_UPLOAD_FAIL:
self.logger.debug(f'{RED}{r.status_code} {r.text} {RESET}')
if self.debug:
self.logger.debug(f'{RED}{r.status_code} {r.text} {RESET}')
return
check_after_secs = processing_info.get('check_after_secs', random.randint(1, 5))
time.sleep(check_after_secs)

View File

@@ -69,6 +69,17 @@ ID_MAP = {
'Favoriters': '^user-\d+$'
}
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.3',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/116.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.1901.20',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.3',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5.2 Safari/605.1.15',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/116.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5.1 Safari/605.1.15',
]
@dataclass
class SearchCategory:

View File

@@ -1,11 +1,11 @@
import random
import sys
from httpx import Client
from .constants import YELLOW, RED, BOLD, RESET
from .constants import YELLOW, RED, BOLD, RESET, USER_AGENTS
from .util import find_key
def update_token(client: Client, key: str, url: str, **kwargs) -> Client:
caller_name = sys._getframe(1).f_code.co_name
try:
@@ -158,7 +158,7 @@ def login(email: str, username: str, password: str, **kwargs) -> Client:
headers={
'authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
'content-type': 'application/json',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36',
'user-agent': random.choice(USER_AGENTS),
'x-twitter-active-user': 'yes',
'x-twitter-client-language': 'en',
},

View File

@@ -2,6 +2,8 @@ import asyncio
import logging.config
import math
import platform
from functools import partial
from typing import Generator
import websockets
from httpx import AsyncClient, Limits, ReadTimeout, URL
@@ -14,6 +16,7 @@ from .util import *
try:
if get_ipython().__class__.__name__ == 'ZMQInteractiveShell':
import nest_asyncio
nest_asyncio.apply()
except:
...
@@ -21,6 +24,7 @@ except:
if platform.system() != 'Windows':
try:
import uvloop
uvloop.install()
except ImportError as e:
...
@@ -239,60 +243,97 @@ class Scraper:
"""
return self._run(Operation.UserByRestId, user_ids, **kwargs)
def download_media(self, ids: list[int], photos: bool = True, videos: bool = True, chunk_size: int = 8192, stream: bool = False, out: str = 'media') -> None:
def download_media(self, ids: list[int], photos: bool = True, videos: bool = True, cards: bool = True, hq_img_variant: bool = True, video_thumb: bool = False, out: str = 'media',
metadata_out: str = 'media.json', **kwargs) -> dict:
"""
Download media from tweets by tweet ids.
Download and extract media metadata from Tweets
@param ids: list of tweet ids containing media
@param photos: flag to include photos
@param videos: flag to include videos
@param chunk_size: chunk size for download
@params stream: flag to enable downloading raw stream
@return: None
@param ids: list of Tweet IDs
@param photos: download images
@param videos: download videos
@param cards: download cards
@param hq_img_variant: download highest quality image, options: {"orig", "4096x4096"}
@param video_thumb: download video thumbnails
@param out: output file for media
@param metadata_out: output file for media metadata
@return: media data
"""
out = Path(out)
out.mkdir(parents=True, exist_ok=True)
tweets = self.tweets_by_id(ids) # todo: switch to batch method tweets_by_ids
urls = []
for tweet in tweets:
tweet_id = find_key(tweet, 'id_str')[0]
url = f'https://twitter.com/i/status/{tweet_id}'
media = [y for x in find_key(tweet, 'media') for y in x]
if photos:
photo_urls = list({u for m in media if 'ext_tw_video_thumb' not in (u := m['media_url_https'])})
[urls.append([url, photo]) for photo in photo_urls]
if videos:
video_urls = [x['variants'] for m in media if (x := m.get('video_info'))]
hq_videos = {sorted(v, key=lambda d: d.get('bitrate', 0))[-1]['url'] for v in video_urls}
[urls.append([url, video]) for video in hq_videos]
async def process():
async with AsyncClient(headers=self.session.headers, cookies=self.session.cookies) as client:
tasks = (download(client, x, y, stream) for x, y in urls)
if self.pbar:
return await tqdm_asyncio.gather(*tasks, desc='Downloading media')
return await asyncio.gather(*tasks)
async def process(fns: Generator) -> list:
limits = {
'max_connections': kwargs.pop('max_connections', 1000),
'max_keepalive_connections': kwargs.pop('max_keepalive_connections', None),
'keepalive_expiry': kwargs.pop('keepalive_expiry', 5.0),
}
headers = {'user-agent': random.choice(USER_AGENTS)}
async with AsyncClient(limits=Limits(**limits), headers=headers, http2=True, verify=False, timeout=60, follow_redirects=True) as client:
return await tqdm_asyncio.gather(*(fn(client=client) for fn in fns), desc='Downloading Media')
async def download(client: AsyncClient, post_url: str, cdn_url: str, stream: bool = False) -> None:
try:
name = urlsplit(post_url).path.replace('/', '_')[1:]
def download(urls: list[tuple], out: str) -> Generator:
out = Path(out)
out.mkdir(parents=True, exist_ok=True)
chunk_size = kwargs.pop('chunk_size', None)
async def get(client: AsyncClient, url: str):
tid, cdn_url = url
ext = urlsplit(cdn_url).path.split('/')[-1]
fname = out / f'{name}_{ext}'
if stream:
async with aiofiles.open(fname, 'wb') as fp:
async with client.stream('GET', cdn_url) as r:
async for chunk in r.aiter_raw(chunk_size):
await fp.write(chunk)
else:
r = await client.get(cdn_url)
async with aiofiles.open(fname, 'wb') as fp:
fname = out / f'{tid}_{ext}'
async with aiofiles.open(fname, 'wb') as fp:
async with client.stream('GET', cdn_url) as r:
async for chunk in r.aiter_raw(chunk_size):
await fp.write(chunk)
except Exception as e:
self.logger.error(f'[{RED}error{RESET}] Failed to download media: {post_url} {e}')
return (partial(get, url=u) for u in urls)
asyncio.run(process())
tweets = self.tweets_by_ids(ids, **kwargs)
media = {}
for data in tweets:
for tweet in data.get('data', {}).get('tweetResult', []):
if _id := tweet.get('result', {}).get('rest_id'):
date = tweet.get('result', {}).get('legacy', {}).get('created_at', '')
uid = tweet.get('result', {}).get('legacy', {}).get('user_id_str', '')
media[_id] = {'date': date, 'uid': uid, 'img': set(), 'video': {'thumb': set(), 'video_info': {}, 'hq': set()}, 'card': []}
for _media in (y for x in find_key(tweet['result'], 'media') for y in x if isinstance(x, list)):
if videos:
if vinfo := _media.get('video_info'):
hq = sorted(vinfo.get('variants', []), key=lambda x: -x.get('bitrate', 0))[0]['url']
media[_id]['video']['video_info'] |= vinfo
media[_id]['video']['hq'].add(hq)
if video_thumb:
if url := _media.get('media_url_https', ''):
media[_id]['video']['thumb'].add(url)
if photos:
if (url := _media.get('media_url_https', '')) and "_video_thumb" not in url:
if hq_img_variant:
url = f'{url}?name=orig'
media[_id]['img'].add(url)
if cards:
if card := tweet.get('result', {}).get('card', {}).get('legacy', {}):
media[_id]['card'].extend(card.get('binding_values', []))
if metadata_out:
media = set2list(media)
metadata_out = Path(metadata_out)
metadata_out.parent.mkdir(parents=True, exist_ok=True) # if user specifies subdir
metadata_out.write_bytes(orjson.dumps(media))
res = []
for k, v in media.items():
tmp = []
if photos:
tmp.extend(v['img'])
if videos:
tmp.extend(v['video']['hq'])
if video_thumb:
tmp.extend(v['video']['thumb'])
if cards:
tmp.extend(parse_card_media(v['card']))
res.extend([(k, m) for m in tmp])
asyncio.run(process(download(res, out)))
return media
def trends(self, utc: list[str] = None) -> dict:
"""
@@ -309,7 +350,8 @@ class Scraper:
trends = find_key(r.json(), 'item')
return {t['content']['trend']['name']: t for t in trends}
except Exception as e:
self.logger.error(f'[{RED}error{RESET}] Failed to get trends\n{e}')
if self.debug:
self.logger.error(f'[{RED}error{RESET}] Failed to get trends\n{e}')
async def process():
url = set_qs('https://twitter.com/i/api/2/guide.json', trending_params)
@@ -393,7 +435,8 @@ class Scraper:
r = await client.get(url, params=params)
return r.json()
except Exception as e:
self.logger.error(f'stream not available for playback\n{e}')
if self.debug:
self.logger.error(f'stream not available for playback\n{e}')
async def _init_chat(self, client: AsyncClient, chat_token: str) -> dict:
payload = {'chat_token': chat_token} # stream['chatToken']
@@ -422,7 +465,8 @@ class Scraper:
data = r.json()
res.append(data)
except ReadTimeout as e:
self.logger.debug(f'End of chat data\n{e}')
if self.debug:
self.logger.debug(f'End of chat data\n{e}')
break
parsed = []
@@ -433,7 +477,8 @@ class Scraper:
msg['payload'] = orjson.loads(msg.get('payload', '{}'))
msg['payload']['body'] = orjson.loads(msg['payload'].get('body'))
except Exception as e:
self.logger.error(f'Failed to parse chat message\n{e}')
if self.debug:
self.logger.error(f'Failed to parse chat message\n{e}')
parsed.extend(messages)
return parsed
@@ -451,7 +496,8 @@ class Scraper:
url = '/'.join(location.split('/')[:-1])
return [f'{url}/{chunk}' for chunk in chunks]
except Exception as e:
self.logger.error(f'Failed to get chunks\n{e}')
if self.debug:
self.logger.error(f'Failed to get chunks\n{e}')
def _get_chat_data(self, keys: list[dict]) -> list[dict]:
async def get(c: AsyncClient, key: dict) -> dict:
@@ -527,7 +573,8 @@ class Scraper:
keys, qid, name = operation
# stay within rate-limits
if (l := len(queries)) > MAX_ENDPOINT_LIMIT:
self.logger.warning(f'Got {l} queries, truncating to first 500.')
if self.debug:
self.logger.warning(f'Got {l} queries, truncating to first 500.')
queries = list(queries)[:MAX_ENDPOINT_LIMIT]
if all(isinstance(q, dict) for q in queries):
@@ -578,10 +625,12 @@ class Scraper:
initial_data = r.json()
res = [r]
# ids = get_ids(initial_data, operation) # todo
ids = set(find_key(initial_data, 'rest_id'))
ids = {x for x in find_key(initial_data, 'rest_id') if x[0].isnumeric()}
cursor = get_cursor(initial_data)
except Exception as e:
self.logger.error('Failed to get initial pagination data', e)
if self.debug:
self.logger.error('Failed to get initial pagination data', e)
return
while (dups < DUP_LIMIT) and cursor:
prev_len = len(ids)
@@ -591,11 +640,13 @@ class Scraper:
r = await self._query(client, operation, cursor=cursor, **kwargs)
data = r.json()
except Exception as e:
self.logger.error(f'Failed to get pagination data\n{e}')
if self.debug:
self.logger.error(f'Failed to get pagination data\n{e}')
return
cursor = get_cursor(data)
# ids |= get_ids(data, operation) # todo
ids |= set(find_key(data, 'rest_id'))
ids |= {x for x in find_key(data, 'rest_id') if x[0].isnumeric()}
if self.debug:
self.logger.debug(f'Unique results: {len(ids)}\tcursor: {cursor}')
if prev_len == len(ids):
@@ -732,7 +783,8 @@ class Scraper:
return {"url": data['source']['location'], "room": room}
except Exception as e:
room = space['data']['audioSpace']['metadata']['rest_id']
self.logger.error(f'Failed to get stream info for https://twitter.com/i/spaces/{room}\n{e}')
if self.debug:
self.logger.error(f'Failed to get stream info for https://twitter.com/i/spaces/{room}\n{e}')
async def get_chunks(client: AsyncClient, url: str) -> list[str]:
try:
@@ -745,7 +797,8 @@ class Scraper:
base = '/'.join(str(url).split('/')[:-1])
return [f'{base}/{c}' for c in parse_chunks(r.text)]
except Exception as e:
self.logger.error(f'Failed to get chunks\n{e}')
if self.debug:
self.logger.error(f'Failed to get chunks\n{e}')
async def poll_space(client: AsyncClient, space: dict) -> dict | None:
curr = 0
@@ -766,11 +819,13 @@ class Scraper:
all_chunks |= new_chunks
for c in sort_chunks(new_chunks):
try:
self.logger.debug(f"write: chunk [{chunk_idx(c)}]\t{c}")
if self.debug:
self.logger.debug(f"write: chunk [{chunk_idx(c)}]\t{c}")
r = await client.get(c)
await fp.write(r.content)
except Exception as e:
self.logger.error(f'Failed to write chunk {c}\n{e}')
if self.debug:
self.logger.error(f'Failed to write chunk {c}\n{e}')
curr = 0 if new_chunks else curr + 1
# wait for new chunks. dynamic playlist is updated every 2-3 seconds
await asyncio.sleep(random.random() + 1.5)

View File

@@ -100,17 +100,20 @@ class Search:
data, entries, cursor = await fn()
if errors := data.get('errors'):
for e in errors:
self.logger.warning(f'{YELLOW}{e.get("message")}{RESET}')
if self.debug:
self.logger.warning(f'{YELLOW}{e.get("message")}{RESET}')
return [], [], ''
ids = set(find_key(data, 'entryId'))
if len(ids) >= 2:
return data, entries, cursor
except Exception as e:
if i == retries:
self.logger.debug(f'Max retries exceeded\n{e}')
if self.debug:
self.logger.debug(f'Max retries exceeded\n{e}')
return
t = 2 ** i + random.random()
self.logger.debug(f'Retrying in {f"{t:.2f}"} seconds\t\t{e}')
if self.debug:
self.logger.debug(f'Retrying in {f"{t:.2f}"} seconds\t\t{e}')
await asyncio.sleep(t)
def _init_logger(self, **kwargs) -> Logger:

View File

@@ -10,13 +10,13 @@ import orjson
from aiofiles.os import makedirs
from httpx import Response, Client
from .constants import GREEN, MAGENTA, RED, RESET, ID_MAP, MAX_GQL_CHAR_LIMIT
from .constants import GREEN, MAGENTA, RED, RESET, ID_MAP, MAX_GQL_CHAR_LIMIT, USER_AGENTS
def init_session():
client = Client(headers={
'authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36',
'user-agent': random.choice(USER_AGENTS),
}, follow_redirects=True)
r = client.post('https://api.twitter.com/1.1/guest/activate.json').json()
client.headers.update({
@@ -37,7 +37,7 @@ def batch_ids(ids: list[int | str], char_limit: int = MAX_GQL_CHAR_LIMIT) -> lis
batch.append(x)
length += len(x)
res.append(batch) if batch else ...
print(f'Batched {sum(map(len, res))} ids into {len(res)} requests')
# print(f'Batched {sum(map(len, res))} ids into {len(res)} requests')
return res
@@ -240,6 +240,24 @@ def get_code(cls, retries=5) -> str | None:
time.sleep(t)
def parse_card_media(cards):
res = []
for c in cards:
img = c.get('value', {}).get('image_value', {})
if c.get('key') == 'photo_image_full_size_original':
url = img.get('url')
res.append([url, img.get('width', 0) * img.get('height', 0)])
return [t[0] for t in sorted(res, key=lambda x: -x[1])]
def set2list(d):
if isinstance(d, dict):
return {k: set2list(v) for k, v in d.items()}
if isinstance(d, set):
return list(d)
return d
# todo: to remove
def get_ids(data: list | dict, operation: tuple) -> set:
expr = ID_MAP[operation[-1]]