switching over from requests to httpx

This commit is contained in:
Trevor Hobenshield
2023-04-21 16:29:59 -07:00
parent 305be022dc
commit bf4bfe5332
6 changed files with 58 additions and 66 deletions

View File

@@ -5,7 +5,7 @@ from setuptools import find_packages, setup
install_requires = [
"nest_asyncio",
"aiohttp",
"requests",
"httpx",
"tqdm",
"orjson",
'uvloop; platform_system != "Windows"',
@@ -13,7 +13,7 @@ install_requires = [
setup(
name="twitter-api-client",
version="0.6.9",
version="0.7.1",
python_requires=">=3.11.0",
description="Twitter API",
long_description=dedent('''

View File

@@ -11,7 +11,7 @@ from urllib.parse import urlencode
from uuid import uuid1, getnode
import orjson
from requests import Response
from httpx import Response
from tqdm import tqdm
from .constants import *

View File

@@ -1,23 +1,23 @@
import sys
from requests import Session
from httpx import Client
from .constants import GREEN, YELLOW, RED, BOLD, RESET
from .util import find_key
def update_token(session: Session, key: str, url: str, **kwargs) -> Session:
def update_token(session: Client, key: str, url: str, **kwargs) -> Client:
caller_name = sys._getframe(1).f_code.co_name
try:
headers = {
"authorization": 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
"content-type": "application/json",
"user-agent": 'Mozilla/5.0 (Linux; Android 11; Nokia G20) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.88 Mobile Safari/537.36',
"x-guest-token": session.cookies.get('guest_token'),
"x-csrf-token": session.cookies.get("ct0"),
"x-twitter-auth-type": "OAuth2Session" if session.cookies.get("auth_token") else '',
"x-twitter-active-user": "yes",
"x-twitter-client-language": 'en',
'authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
'content-type': 'application/json',
'user-agent': 'Mozilla/5.0 (Linux; Android 11; Nokia G20) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.88 Mobile Safari/537.36',
'x-guest-token': session.cookies.get('guest_token', ''),
'x-csrf-token': session.cookies.get('ct0', ''),
'x-twitter-auth-type': 'OAuth2Client' if session.cookies.get('auth_token') else '',
'x-twitter-active-user': 'yes',
'x-twitter-client-language': 'en',
}
r = session.post(url, headers=headers, **kwargs)
info = r.json()
@@ -34,11 +34,11 @@ def update_token(session: Session, key: str, url: str, **kwargs) -> Session:
return session
def init_guest_token(session: Session) -> Session:
return update_token(session, 'guest_token', 'https://api.twitter.com/1.1/guest/activate.json', json={})
def init_guest_token(session: Client) -> Client:
return update_token(session, 'guest_token', 'https://api.twitter.com/1.1/guest/activate.json')
def flow_start(session: Session) -> Session:
def flow_start(session: Client) -> Client:
return update_token(session, 'flow_token', 'https://api.twitter.com/1.1/onboarding/task.json',
params={'flow_name': 'login'},
json={
@@ -51,7 +51,7 @@ def flow_start(session: Session) -> Session:
})
def flow_instrumentation(session: Session) -> Session:
def flow_instrumentation(session: Client) -> Client:
return update_token(session, 'flow_token', 'https://api.twitter.com/1.1/onboarding/task.json', json={
"flow_token": session.cookies.get('flow_token'),
"subtask_inputs": [{
@@ -61,7 +61,7 @@ def flow_instrumentation(session: Session) -> Session:
})
def flow_username(session: Session) -> Session:
def flow_username(session: Client) -> Client:
return update_token(session, 'flow_token', 'https://api.twitter.com/1.1/onboarding/task.json', json={
"flow_token": session.cookies.get('flow_token'),
"subtask_inputs": [{
@@ -74,7 +74,7 @@ def flow_username(session: Session) -> Session:
})
def flow_password(session: Session) -> Session:
def flow_password(session: Client) -> Client:
return update_token(session, 'flow_token', 'https://api.twitter.com/1.1/onboarding/task.json', json={
"flow_token": session.cookies.get('flow_token'),
"subtask_inputs": [{
@@ -83,7 +83,7 @@ def flow_password(session: Session) -> Session:
})
def flow_duplication_check(session: Session) -> Session:
def flow_duplication_check(session: Client) -> Client:
return update_token(session, 'flow_token', 'https://api.twitter.com/1.1/onboarding/task.json', json={
"flow_token": session.cookies.get('flow_token'),
"subtask_inputs": [{
@@ -93,7 +93,7 @@ def flow_duplication_check(session: Session) -> Session:
})
def confirm_email(session: Session) -> Session:
def confirm_email(session: Client) -> Client:
return update_token(session, 'flow_token', 'https://api.twitter.com/1.1/onboarding/task.json', json={
"flow_token": session.cookies.get('flow_token'),
"subtask_inputs": [
@@ -107,7 +107,7 @@ def confirm_email(session: Session) -> Session:
})
def execute_login_flow(session: Session) -> Session:
def execute_login_flow(session: Client) -> Client:
session = init_guest_token(session)
for fn in [flow_start, flow_instrumentation, flow_username, flow_password, flow_duplication_check]:
session = fn(session)
@@ -119,8 +119,8 @@ def execute_login_flow(session: Session) -> Session:
return session
def login(email: str, username: str, password: str) -> Session:
session = Session()
def login(email: str, username: str, password: str) -> Client:
session = Client()
session.cookies.update({
"email": email,
"username": username,

View File

@@ -3,7 +3,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps, partial
import orjson
from requests import Session, Response
from httpx import Client, Response
from tqdm import tqdm
from .util import log_config, logging, find_key, save_data, get_cursor, SUCCESS, WARN, ERROR, RESET
@@ -48,7 +48,7 @@ def log(fn=None, *, level: int = logging.DEBUG, info: int = 0) -> callable:
class Scraper:
def __init__(self):
self.session = Session()
self.session = Client()
self.guest_token = self.get_guest_token()
self.api = 'https://twitter.com/i/api/graphql'
@@ -169,7 +169,7 @@ class Scraper:
return dict(sorted({k.lower(): v for k, v in headers.items()}.items()))
def get_guest_token(self) -> str:
return Session().post('https://api.twitter.com/1.1/guest/activate.json', headers={
return Client().post('https://api.twitter.com/1.1/guest/activate.json', headers={
'authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
'user-agent': 'Mozilla/5.0 (Linux; Android 11; Nokia G20) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.88 Mobile Safari/537.36'
}).json()['guest_token']

View File

@@ -8,8 +8,7 @@ from pathlib import Path
from urllib.parse import urlsplit
import orjson
from aiohttp import ClientSession, TCPConnector, ClientResponse
from requests import Response
from httpx import AsyncClient, Response
from tqdm import tqdm
from .constants import *
@@ -28,11 +27,10 @@ except:
...
if platform.system() != 'Windows':
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
else:
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
try:
import uvloop
except ImportError as e:
...
class Scraper:
@@ -117,18 +115,20 @@ class Scraper:
return data
def _run(self, ids: list[int | str], operation: tuple, limit=None):
if platform.system() != 'Windows':
with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner:
return runner.run(self._process(ids, operation, limit))
return asyncio.run(self._process(ids, operation, limit))
async def _process(self, ids: list[int | str], op: tuple, limit: int | None) -> list:
conn = TCPConnector(limit=100, ssl=False, ttl_dns_cache=69)
async with ClientSession(headers=get_headers(self.session), connector=conn) as s:
s.cookie_jar.update_cookies(self.session.cookies)
async with AsyncClient(headers=get_headers(self.session)) as s:
return await asyncio.gather(*(self._paginate(s, _id, op, limit) for _id in ids))
async def _paginate(self, session: ClientSession, _id: int | str | list[int], operation: tuple,
async def _paginate(self, session: AsyncClient, _id: int | str | list[int], operation: tuple,
limit: int | None) -> list[dict]:
r = await self._query(session, _id, operation)
initial_data = await r.json()
initial_data = r.json()
res = [initial_data]
ids = set(find_key(initial_data, 'rest_id'))
dups = 0
@@ -140,13 +140,12 @@ class Scraper:
if prev_len >= limit:
return res
# csrf must match in headers and cookies
if csrf := r.cookies.get("ct0"):
session.headers.update({"x-csrf-token": csrf.value})
session.cookie_jar.update_cookies(r.cookies)
r = await self._query(session, _id, operation, cursor=cursor)
data = await r.json()
data = r.json()
if len(find_key(data, 'entries')[0]) <= 2:
# only top/bottom cursor in result
return res
cursor = get_cursor(data)
ids |= set(find_key(data, 'rest_id'))
@@ -160,15 +159,15 @@ class Scraper:
res.append(data)
return res
async def _query(self, session: ClientSession, _id: int | str | list, operation: tuple, **kwargs) -> ClientResponse:
async def _query(self, session: AsyncClient, _id: int | str | list, operation: tuple, **kwargs) -> Response:
qid, op, k = operation
params = {k: orjson.dumps(v).decode() for k, v in {
'variables': {k: _id} | kwargs | Operation.default_variables,
'features': Operation.default_features,
}.items()}
r = await session.get(f'{self.api}/{qid}/{op}', params=params)
txt = await r.text()
data = await r.json()
txt = r.text
data = r.json()
if self.debug:
self.log(r, txt, data)
if self.save:
@@ -235,8 +234,8 @@ class Scraper:
)
return trends
def log(self, r: ClientResponse | Response, txt: str, data: dict):
status = r.status if isinstance(r, ClientResponse) else r.status_code
def log(self, r: Response | Response, txt: str, data: dict):
status = r.status_code
def stat(r):
if self.debug >= 1:

View File

@@ -8,6 +8,7 @@ from pathlib import Path
import aiohttp
import orjson
from httpx import AsyncClient
from .constants import *
from .login import login
@@ -29,12 +30,8 @@ except:
if platform.system() != 'Windows':
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except:
except ImportError as e:
...
else:
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
class Search:
@@ -44,15 +41,16 @@ class Search:
def run(self, *args, out: str = 'data', **kwargs):
out_path = self.make_output_dirs(out)
if platform.system() != 'Windows':
with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner:
return runner.run(self.process(args, search_config, out_path, **kwargs))
return asyncio.run(self.process(args, search_config, out_path, **kwargs))
async def process(self, queries: tuple, config: dict, out: Path, **kwargs) -> list:
conn = aiohttp.TCPConnector(limit=len(queries), ssl=False)
async with aiohttp.ClientSession(headers=get_headers(self.session), connector=conn) as s:
s.cookie_jar.update_cookies(self.session.cookies)
async with AsyncClient(headers=get_headers(self.session)) as s:
return await asyncio.gather(*(self.paginate(q, s, config, out, **kwargs) for q in queries))
async def paginate(self, query: str, session: aiohttp.ClientSession, config: dict, out: Path, **kwargs) -> list[
async def paginate(self, query: str, session: AsyncClient, config: dict, out: Path, **kwargs) -> list[
dict]:
config['q'] = query
r, data, next_cursor = await self.backoff(lambda: self.get(session, config), query)
@@ -67,11 +65,6 @@ class Search:
logger.debug(f'{c}{query}{reset}')
config['cursor'] = next_cursor
# csrf must match in headers and cookies
if csrf := r.cookies.get("ct0"):
session.headers.update({"x-csrf-token": csrf.value})
session.cookie_jar.update_cookies(r.cookies)
r, data, next_cursor = await self.backoff(lambda: self.get(session, config), query)
data['query'] = query
(out / f'raw/{time.time_ns()}.json').write_text(
@@ -96,10 +89,10 @@ class Search:
logger.debug(f'No data for: \u001b[1m{info}\u001b[0m | retrying in {f"{t:.2f}"} seconds\t\t{e}')
time.sleep(t)
async def get(self, session: aiohttp.ClientSession, params: dict) -> tuple:
async def get(self, session: AsyncClient, params: dict) -> tuple:
url = set_qs(self.api, params, update=True, safe='()')
r = await session.get(url)
data = await r.json()
data = r.json()
next_cursor = self.get_cursor(data)
return r, data, next_cursor