added twitter search

This commit is contained in:
trevorhobenshield
2023-03-11 12:07:02 -08:00
parent 76f7b8737d
commit fd6f3924b2
4 changed files with 227 additions and 23 deletions

View File

@@ -3,7 +3,7 @@
Complete implementation of the undocumented Twitter API
- Frantically written in a day, crude, needs refactoring/redesign, code is repetitive.
- Includes a scraper and automation library.
- Includes twitter search, scraper, and automation library.
### Installation
@@ -57,6 +57,7 @@ r = stats(50393960, session)
```
### Scraping
#### User/Tweet data
```python
from src.scrape import *
from src.login import login
@@ -86,3 +87,20 @@ favoriters = get_favoriters(tweet_ids, session=session)
######### Media (Images/Videos) ########
download_media(tweet_ids, session=session)
```
#### Search
```python
from src.search import search
from src.config.search_config import search_config
search(
'(#dogs OR #cats) min_retweets:500',
'min_faves:10000 @elonmusk until:2023-02-16 since:2023-02-01',
'brasil portugal -argentina',
'paperswithcode -tensorflow -tf',
'skateboarding baseball guitar',
'cheese bread butter',
'ios android',
config=search_config
)
```

View File

@@ -0,0 +1,39 @@
search_config = {
"include_profile_interstitial_type": 1,
"include_blocking": 1,
"include_blocked_by": 1,
"include_followed_by": 1,
"include_want_retweets": 1,
"include_mute_edge": 1,
"include_can_dm": 1,
"include_can_media_tag": 1,
"include_ext_has_nft_avatar": 1,
"include_ext_is_blue_verified": 1,
"include_ext_verified_type": 1,
"skip_status": 1,
"cards_platform": "Web-12",
"include_cards": 1,
"include_ext_alt_text": "true",
"include_ext_limited_action_results": "false",
"include_quote_count": "true",
"include_reply_count": 1,
"tweet_mode": "extended",
"include_ext_collab_control": "true",
"include_ext_views": "true",
"include_entities": "true",
"include_user_entities": "true",
"include_ext_media_color": "true",
"include_ext_media_availability": "true",
"include_ext_sensitive_media_warning": "true",
"include_ext_trusted_friends_metadata": "true",
"send_error_codes": "true",
"simple_quoted_tweet": "true",
"query_source": "typed_query",
"count": 100,
"q": "",
"requestContext": "launch",
"pc": 1,
"spelling_corrections": 1,
"include_ext_edit_control": "true",
"ext": "mediaStats,highlightedLabel,hasNftAvatar,voiceInfo,birdwatchPivot,enrichments,superFollowMetadata,unmentionInfo,editControl,collab_control,vibe"
}

View File

@@ -89,7 +89,7 @@ def log(fn=None, *, level: int = logging.DEBUG, info: list = None):
return wrapper
def _get_headers(session: Session) -> dict:
def get_auth_headers(session: Session) -> dict:
return {
'authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
'accept-encoding': 'gzip, deflate, br',
@@ -102,7 +102,7 @@ def _get_headers(session: Session) -> dict:
async def get_status(media_id: str, auth_session: Session, check_after_secs: int = 1):
url = 'https://upload.twitter.com/i/media/upload.json'
headers = _get_headers(auth_session)
headers = get_auth_headers(auth_session)
params = {'command': 'STATUS', 'media_id': media_id}
while 1:
await asyncio.sleep(check_after_secs)
@@ -124,7 +124,7 @@ async def get_status(media_id: str, auth_session: Session, check_after_secs: int
async def upload_media(fname: str, auth_session: Session):
url = 'https://upload.twitter.com/i/media/upload.json'
headers = _get_headers(auth_session)
headers = get_auth_headers(auth_session)
conn = aiohttp.TCPConnector(limit=0, ssl=False, ttl_dns_cache=69)
async with aiohttp.ClientSession(headers=headers, connector=conn) as s:
file = Path(fname)
@@ -174,7 +174,7 @@ async def upload_media(fname: str, auth_session: Session):
def add_alt_text(text: str, media_id: int, session: Session):
params = {"media_id": media_id, "alt_text": {"text": text}}
url = 'https://api.twitter.com/1.1/media/metadata/create.json'
r = session.post(url, headers=_get_headers(session), json=params)
r = session.post(url, headers=get_auth_headers(session), json=params)
return r
@@ -185,7 +185,7 @@ def like_tweet(tweet_id: int, session: Session):
params = operations[operation]
params['variables']['tweet_id'] = tweet_id
url = f"https://api.twitter.com/graphql/{qid}/{operation}"
r = session.post(url, headers=_get_headers(session), json=params)
r = session.post(url, headers=get_auth_headers(session), json=params)
logger.debug(f'{tweet_id = }')
return r
@@ -197,7 +197,7 @@ def unlike_tweet(tweet_id: int, session: Session):
params = operations[operation]
params['variables']['tweet_id'] = tweet_id
url = f"https://api.twitter.com/graphql/{qid}/{operation}"
r = session.post(url, headers=_get_headers(session), json=params)
r = session.post(url, headers=get_auth_headers(session), json=params)
logger.debug(f'{tweet_id = }')
return r
@@ -234,7 +234,7 @@ def create_tweet(text: str, session: Session, media: list[dict | str] = None, **
params['variables'] |= poll_params
url = f"https://api.twitter.com/graphql/{qid}/{operation}"
r = session.post(url, headers=_get_headers(session), json=params)
r = session.post(url, headers=get_auth_headers(session), json=params)
return r
@@ -255,7 +255,7 @@ def delete_tweet(tweet_id: int, session: Session):
params = operations[operation]
params['variables']['tweet_id'] = tweet_id
url = f"https://api.twitter.com/graphql/{qid}/{operation}"
r = session.post(url, headers=_get_headers(session), json=params)
r = session.post(url, headers=get_auth_headers(session), json=params)
if 200 <= r.status_code < 300:
logger.debug(f'{WARN}DELETE{RESET} tweet: {tweet_id}')
return r.json()
@@ -273,7 +273,7 @@ def retweet(tweet_id: int, session: Session):
params = operations[operation]
params['variables']['tweet_id'] = tweet_id
url = f"https://api.twitter.com/graphql/{qid}/{operation}"
r = session.post(url, headers=_get_headers(session), json=params)
r = session.post(url, headers=get_auth_headers(session), json=params)
if 200 <= r.status_code < 300:
logger.debug(f'{SUCCESS}RETWEET{RESET} tweet: {tweet_id}')
return r.json()
@@ -285,7 +285,7 @@ def unretweet(tweet_id: int, session: Session):
params = operations[operation]
params['variables']['source_tweet_id'] = tweet_id
url = f"https://api.twitter.com/graphql/{qid}/{operation}"
r = session.post(url, headers=_get_headers(session), json=params)
r = session.post(url, headers=get_auth_headers(session), json=params)
if 200 <= r.status_code < 300:
logger.debug(f'{SUCCESS}UNRETWEET{RESET} tweet: {tweet_id}')
return r.json()
@@ -298,7 +298,7 @@ def get_tweets(user_id: int, session: Session):
params['variables']['userId'] = user_id
query = build_query(params)
url = f"https://api.twitter.com/graphql/{qid}/{operation}?{query}"
r = session.get(url, headers=_get_headers(session))
r = session.get(url, headers=get_auth_headers(session))
return r.json()
@@ -319,7 +319,7 @@ def follow(user_id: int, session: Session):
"include_ext_verified_type": "1",
"skip_status": "1",
}
headers = _get_headers(session)
headers = get_auth_headers(session)
headers['content-type'] = 'application/x-www-form-urlencoded'
url = 'https://api.twitter.com/1.1/friendships/create.json'
r = session.post(url, headers=headers, data=urlencode(settings))
@@ -344,7 +344,7 @@ def unfollow(user_id: int, session: Session):
"include_ext_verified_type": "1",
"skip_status": "1",
}
headers = _get_headers(session)
headers = get_auth_headers(session)
headers['content-type'] = 'application/x-www-form-urlencoded'
url = 'https://api.twitter.com/1.1/friendships/destroy.json'
r = session.post(url, headers=headers, data=urlencode(settings))
@@ -357,7 +357,7 @@ def mute(user_id: int, session: Session):
'user_id': user_id
}
try:
headers = _get_headers(session)
headers = get_auth_headers(session)
headers['content-type'] = 'application/x-www-form-urlencoded'
url = 'https://api.twitter.com/1.1/mutes/users/create.json'
r = session.post(url, headers=headers, data=urlencode(settings))
@@ -375,7 +375,7 @@ def unmute(user_id: int, session: Session):
'user_id': user_id
}
try:
headers = _get_headers(session)
headers = get_auth_headers(session)
headers['content-type'] = 'application/x-www-form-urlencoded'
url = 'https://api.twitter.com/1.1/mutes/users/destroy.json'
r = session.post(url, headers=headers, data=urlencode(settings))
@@ -407,7 +407,7 @@ def enable_notifications(user_id: int, session: Session):
"skip_status": "1",
}
try:
headers = _get_headers(session)
headers = get_auth_headers(session)
headers['content-type'] = 'application/x-www-form-urlencoded'
url = 'https://api.twitter.com/1.1/friendships/update.json'
r = session.post(url, headers=headers, data=urlencode(settings))
@@ -440,7 +440,7 @@ def disable_notifications(user_id: int, session: Session):
"skip_status": "1",
}
try:
headers = _get_headers(session)
headers = get_auth_headers(session)
headers['content-type'] = 'application/x-www-form-urlencoded'
url = 'https://api.twitter.com/1.1/friendships/update.json'
r = session.post(url, headers=headers, data=urlencode(settings))
@@ -459,7 +459,7 @@ def block(user_id: int, session: Session):
'user_id': user_id
}
try:
headers = _get_headers(session)
headers = get_auth_headers(session)
headers['content-type'] = 'application/x-www-form-urlencoded'
url = 'https://api.twitter.com/1.1/blocks/create.json'
r = session.post(url, headers=headers, data=urlencode(settings))
@@ -477,7 +477,7 @@ def unblock(user_id: int, session: Session):
'user_id': user_id
}
try:
headers = _get_headers(session)
headers = get_auth_headers(session)
headers['content-type'] = 'application/x-www-form-urlencoded'
url = 'https://api.twitter.com/1.1/blocks/destroy.json'
r = session.post(url, headers=headers, data=urlencode(settings))
@@ -502,7 +502,7 @@ def update_search_settings(session: Session, **kwargs):
settings = {}
settings |= kwargs
twid = int(session.cookies.get_dict()['twid'].split('=')[-1].strip('"'))
headers = _get_headers(session=session)
headers = get_auth_headers(session=session)
r = session.post(
url=f'https://api.twitter.com/1.1/strato/column/User/{twid}/search/searchSafety',
headers=headers,
@@ -551,7 +551,7 @@ def update_content_settings(session: Session, **kwargs):
else:
settings = {}
settings |= kwargs
headers = _get_headers(session=session)
headers = get_auth_headers(session=session)
headers['content-type'] = 'application/x-www-form-urlencoded'
r = session.post(
url='https://api.twitter.com/1.1/account/settings.json',
@@ -577,6 +577,6 @@ def stats(rest_id: int, session: Session):
params['variables']['rest_id'] = rest_id
query = build_query(params)
url = f"https://api.twitter.com/graphql/{qid}/{operation}?{query}"
r = session.get(url, headers=_get_headers(session))
r = session.get(url, headers=get_auth_headers(session))
return r.json()

147
src/search.py Normal file
View File

@@ -0,0 +1,147 @@
import asyncio
import atexit
import json
import logging.config
import random
import re
import time
from pathlib import Path
from urllib.parse import quote, urlencode, parse_qs, urlsplit, urlunsplit
import aiohttp
import requests
from .log.config import log_config
IN_PATH = Path('~').expanduser() / 'data/raw'
OUT_PATH = Path('~').expanduser() / f'data/processed/combined_{time.time_ns()}.json'
reset = '\u001b[0m'
colors = [f'\u001b[{i}m' for i in range(30, 38)]
logging.config.dictConfig(log_config)
logger = logging.getLogger(__name__)
# try:
# if get_ipython().__class__.__name__ == 'ZMQInteractiveShell':
# import nest_asyncio
# nest_asyncio.apply()
# except:
# ...
#
# if sys.platform != 'win32':
# try:
# import uvloop
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# except:
# ...
# else:
# asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
def search(*args, config: dict, out: str = 'data'):
out_path = make_output_dirs(out)
return asyncio.run(process(args, config, out_path))
async def process(queries: tuple, config: dict, out: Path) -> tuple:
conn = aiohttp.TCPConnector(limit=len(queries), ssl=False)
async with aiohttp.ClientSession(headers=__get_headers(), connector=conn) as s:
return await asyncio.gather(*(paginate(q, s, config, out) for q in queries))
async def paginate(query: str, session: aiohttp.ClientSession, config: dict, out: Path) -> list[dict]:
api = 'https://api.twitter.com/2/search/adaptive.json?'
config['q'] = query
data, next_cursor = await backoff(lambda: get(session, api, config), query)
all_data = []
c = colors.pop() if colors else ''
while next_cursor:
logger.debug(f'{c}{query}{reset}')
config['cursor'] = next_cursor
data, next_cursor = await backoff(lambda: get(session, api, config), query)
data['query'] = query
(out / f'raw/{time.time_ns()}.json').write_text(json.dumps(data, indent=4))
all_data.append(data)
return all_data
async def backoff(fn, info, retries=12):
for i in range(retries + 1):
try:
data, next_cursor = await fn()
if not data.get('globalObjects', {}).get('tweets'):
raise Exception
return data, next_cursor
except Exception as e:
if i == retries:
logger.debug(f'Max retries exceeded\n{e}')
return
t = 2 ** i + random.random()
logger.debug(f'No data for: \u001b[1m{info}\u001b[0m | retrying in {f"{t:.2f}"} seconds\t\t{e}')
time.sleep(t)
async def get(session: aiohttp.ClientSession, api: str, params: dict) -> tuple[dict, str]:
url = set_qs(api, params, update=True)
r = await session.get(url)
data = await r.json()
next_cursor = get_cursor(data)
return data, next_cursor
def get_cursor(res: dict):
try:
for instr in res['timeline']['instructions']:
if replaceEntry := instr.get('replaceEntry'):
cursor = replaceEntry['entry']['content']['operation']['cursor']
if cursor['cursorType'] == 'Bottom':
return cursor['value']
continue
for entry in instr['addEntries']['entries']:
if entry['entryId'] == 'cursor-bottom-0':
return entry['content']['operation']['cursor']['value']
except Exception as e:
logger.debug(e)
def set_qs(url: str, qs: dict, update=False) -> str:
*_, q, f = urlsplit(url)
return urlunsplit((*_, urlencode(qs | parse_qs(q) if update else qs, doseq=True, quote_via=quote, safe='()'), f))
def __get_headers(fname: str = None) -> dict:
if fname:
with open(fname) as fp:
return {y.group(): z.group()
for x in fp.read().splitlines()
if (y := re.search('^[\w-]+(?=:\s)', x),
z := re.search(f'(?<={y.group()}:\s).*', x))}
# default
headers = {
'authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
r = requests.post('https://api.twitter.com/1.1/guest/activate.json', headers=headers)
headers['x-guest-token'] = r.json()['guest_token']
return headers
def make_output_dirs(path: str) -> Path:
p = Path('~').expanduser() / path
(p / 'raw').mkdir(parents=True, exist_ok=True)
(p / 'processed').mkdir(parents=True, exist_ok=True)
(p / 'final').mkdir(parents=True, exist_ok=True)
return p
@atexit.register
def combine_results(in_path: Path = IN_PATH, out_path: Path = OUT_PATH):
try:
out_path.write_text(json.dumps({
k: v
for p in in_path.iterdir() if p.suffix == '.json'
for k, v in json.loads(p.read_text())['globalObjects']['tweets'].items()
}, indent=2))
except Exception as e:
logger.debug(f'FAILED TO COMBINE RESULTS, {e}')