fix csrf header/cookie mismatch

This commit is contained in:
Trevor Hobenshield
2023-03-18 16:44:06 -07:00
parent 224ed31deb
commit 4c49768bd5
9 changed files with 527 additions and 511 deletions

137
readme.md
View File

@@ -11,78 +11,77 @@ pip install twitter-api-client
### Automation ### Automation
```python ```python
from twitter.main import * from twitter import account
from twitter.login import login from twitter.login import login
usr, pwd = ..., ... username,password = ...,...
s = login(usr, pwd) # session s = login(username, password) # session
create_poll(s, 'test poll', ['hello', 'world', 'foo', 'bar'], 10080) account.create_poll(s, 'test poll', ['hello', 'world', 'foo', 'bar'], 10080)
# DM 1 user # DM 1 user
dm(s, [111], 'hello world', filename='test.png') account.dm(s, [111], 'hello world', filename='test.png')
# DM group of users # DM group of users
dm(s, [111, 222, 333], 'foo bar', filename='test.mp4') account.dm(s, [111, 222, 333], 'foo bar', filename='test.mp4')
# tweets # tweets
tweet(s, 'test 123') account.tweet(s, 'test 123')
tweet(s, 'test 123', media=['test.mp4']) account.tweet(s, 'test 123', media=['test.mp4'])
tweet(s, 'test 123', media=['test.jpg', 'test.png', 'test.jpeg', 'test.jfif']) account.tweet(s, 'test 123', media=['test.jpg', 'test.png', 'test.jpeg', 'test.jfif'])
tweet(s, 'test 123', media=[{'file': 'test.jpeg', 'tagged_users': [123234345456], 'alt': 'some image'}]) account.tweet(s, 'test 123', media=[{'file': 'test.jpeg', 'tagged_users': [123234345456], 'alt': 'some image'}])
untweet(s, 123) account.untweet(s, 123)
retweet(s, 1633609779745820675) account.retweet(s, 1633609779745820675)
unretweet(s, 1633609779745820675) account.unretweet(s, 1633609779745820675)
quote(s, 1633609779745820675, 'elonmusk', 'test 123') account.quote(s, 1633609779745820675, 'elonmusk', 'test 123')
comment(s, 1633609779745820675, 'test 123') account.comment(s, 1633609779745820675, 'test 123')
like(s, 1633609779745820675) account.like(s, 1633609779745820675)
unlike(s, 1633609779745820675) account.unlike(s, 1633609779745820675)
bookmark(s, 1633609779745820675) account.bookmark(s, 1633609779745820675)
unbookmark(s, 1633609779745820675) account.unbookmark(s, 1633609779745820675)
pin(s, 1635479755364651008) account.pin(s, 1635479755364651008)
unpin(s, 1635479755364651008) account.unpin(s, 1635479755364651008)
# users # users
follow(s, 50393960) account.follow(s, 50393960)
unfollow(s, 50393960) account.unfollow(s, 50393960)
mute(s, 50393960) account.mute(s, 50393960)
unmute(s, 50393960) account.unmute(s, 50393960)
enable_notifications(s, 50393960) account.enable_notifications(s, 50393960)
disable_notifications(s, 50393960) account.disable_notifications(s, 50393960)
block(s, 50393960) account.block(s, 50393960)
unblock(s, 50393960) account.unblock(s, 50393960)
# other # other
stats(s, 50393960) account.stats(s, 50393960)
# user profile # user profile
update_profile_image(s, 'test.jpg') account.update_profile_image(s, 'test.jpg')
update_profile_banner(s, 'test.png') account.update_profile_banner(s, 'test.png')
update_profile_info(s, name='Foo Bar', description='Test 123', location='Victoria, BC') account.update_profile_info(s, name='Foo Bar', description='Test 123', location='Victoria, BC')
# topics # topics
follow_topic(s, 808713037230157824) account.follow_topic(s, 808713037230157824)
unfollow_topic(s, 808713037230157824) account.unfollow_topic(s, 808713037230157824)
# lists # lists
create_list(s, 'My List', 'description of my list', private=False) account.create_list(s, 'My List', 'description of my list', private=False)
update_list(s, 123456, 'My Updated List', 'some updated description', private=False) account.update_list(s, 123456, 'My Updated List', 'some updated description', private=False)
update_list_banner(s, 123456, 'test.png') account.update_list_banner(s, 123456, 'test.png')
delete_list_banner(s, 123456) account.delete_list_banner(s, 123456)
add_list_member(s, 123456, 50393960) account.add_list_member(s, 123456, 50393960)
remove_list_member(s, 123456, 50393960) account.remove_list_member(s, 123456, 50393960)
delete_list(s, 123456) account.delete_list(s, 123456)
pin_list(s, 123456) account.pin_list(s, 123456)
unpin_list(s, 123456) account.unpin_list(s, 123456)
# refresh all pinned lists in this order # refresh all pinned lists in this order
update_pinned_lists(s, [123, 234, 345, 456]) account.update_pinned_lists(s, [123, 234, 345, 456])
# unpin all lists # unpin all lists
update_pinned_lists(s, []) account.update_pinned_lists(s, [])
# example configuration # example configuration
update_account_settings(s, { account.update_settings(s, {
"address_book_live_sync_enabled": False, "address_book_live_sync_enabled": False,
"allow_ads_personalization": False, "allow_ads_personalization": False,
"allow_authenticated_periscope_requests": True, "allow_authenticated_periscope_requests": True,
@@ -128,7 +127,7 @@ update_account_settings(s, {
}) })
# example configuration # example configuration
update_search_settings(s, { account.update_search_settings(s, {
"optInFiltering": True, # filter out nsfw content "optInFiltering": True, # filter out nsfw content
"optInBlocking": True, # filter out blocked accounts "optInBlocking": True, # filter out blocked accounts
}) })
@@ -141,53 +140,47 @@ update_search_settings(s, {
#### Get all user/tweet data #### Get all user/tweet data
```python ```python
from twitter.scrape import * from twitter import scraper
from twitter.login import login from twitter.login import login
usr, pwd = ..., ... username,password = ...,...
s = login(usr, pwd) # session s = login(username, password) # session
user_ids = [...]
usernames = [...]
tweet_ids = [...]
####### User Data ######## ####### User Data ########
users = get_user_by_screen_name(s, usernames) users = scraper.get_user_by_screen_name(s, ['bob123', 'jim456', 'stanley789'])
tweets = get_user_tweets(s, user_ids) tweets = scraper.get_user_tweets(s, [123, 234, 345])
likes = get_likes(s, user_ids) likes = scraper.get_likes(s, [123, 234, 345])
tweets_and_replies = get_tweets_and_replies(s, user_ids) tweets_and_replies = scraper.get_tweets_and_replies(s, [123, 234, 345])
media = get_media(s, user_ids) media = scraper.get_media(s, [123, 234, 345])
following = get_following(s, user_ids) following = scraper.get_following(s, [123, 234, 345])
followers = get_followers(s, user_ids) followers = scraper.get_followers(s, [123, 234, 345])
######## Tweet Data ######## ######## Tweet Data ########
tweet = get_tweet_by_rest_id(s, tweet_ids) tweet = scraper.get_tweets_by_rest_id(s, [456, 567, 678])
tweet_detail = get_tweets(s, tweet_ids) tweet_detail = scraper.get_tweets(s, [456, 567, 678])
retweeters = get_retweeters(s, tweet_ids) retweeters = scraper.get_retweeters(s, [456, 567, 678])
favoriters = get_favoriters(s, tweet_ids) favoriters = scraper.get_favoriters(s, [456, 567, 678])
download_media(s, tweet_ids) scraper.download_media(s, [456, 567, 678])
``` ```
#### Most recent ~100 results of user/tweet data #### Most recent ~100 results of user/tweet data
```python ```python
from twitter.login import login from twitter.login import login
from twitter.scrape import query
from twitter.constants import Operation from twitter.constants import Operation
from twitter import scraper
from functools import partial from functools import partial
username, password = ..., ... username, password = ..., ...
session = login(username, password) session = login(username, password)
user_ids = [123, 234, 345, 456] user_ids = [123, 234, 345, 456]
user_query = partial(query, session, user_ids) user_query = partial(scraper.query, session, user_ids)
tweets = user_query(Operation.Data.UserTweets) tweets = user_query(Operation.Data.UserTweets)
likes = user_query(Operation.Data.Likes) likes = user_query(Operation.Data.Likes)
followers = user_query(Operation.Data.Followers) followers = user_query(Operation.Data.Followers)
``` ```
#### Search #### Search

160
setup.py
View File

@@ -14,89 +14,93 @@ if sys.platform != 'win32':
setup( setup(
name="twitter-api-client", name="twitter-api-client",
version="0.3.4", version="0.3.5",
description="Twitter API", description="Twitter API",
long_description=dedent(''' long_description=dedent('''
## The Undocumented Twitter API Complete implementation of the undocumented Twitter API
Includes tools to **scrape**, **automate**, and **search** twitter
### Installation
```bash
pip install twitter-api-client
```
A free alternative to the Twitter API
### Automation ### Automation
```python ```python
from twitter.main import * from twitter import account
from twitter.login import login from twitter.login import login
usr, pwd = ..., ... username,password = ...,...
s = login(usr, pwd) # session s = login(username, password) # session
account.create_poll(s, 'test poll', ['hello', 'world', 'foo', 'bar'], 10080)
create_poll(s, 'test poll', ['hello', 'world', 'foo', 'bar'], 10080)
# DM 1 user # DM 1 user
dm(s, [111], 'hello world', filename='test.png') account.dm(s, [111], 'hello world', filename='test.png')
# DM group of users # DM group of users
dm(s, [111,222,333], 'foo bar', filename='test.mp4') account.dm(s, [111, 222, 333], 'foo bar', filename='test.mp4')
# tweets # tweets
tweet(s, 'test 123') account.tweet(s, 'test 123')
tweet(s, 'test 123', media=['test.mp4']) account.tweet(s, 'test 123', media=['test.mp4'])
tweet(s, 'test 123', media=['test.jpg', 'test.png', 'test.jpeg', 'test.jfif']) account.tweet(s, 'test 123', media=['test.jpg', 'test.png', 'test.jpeg', 'test.jfif'])
tweet(s, 'test 123', media=[{'file': 'test.jpeg', 'tagged_users': [123234345456], 'alt': 'some image'}]) account.tweet(s, 'test 123', media=[{'file': 'test.jpeg', 'tagged_users': [123234345456], 'alt': 'some image'}])
untweet(s, 123) account.untweet(s, 123)
retweet(s, 1633609779745820675) account.retweet(s, 1633609779745820675)
unretweet(s, 1633609779745820675) account.unretweet(s, 1633609779745820675)
quote(s, 1633609779745820675, 'elonmusk', 'test 123') account.quote(s, 1633609779745820675, 'elonmusk', 'test 123')
comment(s, 1633609779745820675, 'test 123') account.comment(s, 1633609779745820675, 'test 123')
like(s, 1633609779745820675) account.like(s, 1633609779745820675)
unlike(s, 1633609779745820675) account.unlike(s, 1633609779745820675)
bookmark(s, 1633609779745820675) account.bookmark(s, 1633609779745820675)
unbookmark(s, 1633609779745820675) account.unbookmark(s, 1633609779745820675)
pin(s, 1635479755364651008) account.pin(s, 1635479755364651008)
unpin(s, 1635479755364651008) account.unpin(s, 1635479755364651008)
# users # users
follow(s, 50393960) account.follow(s, 50393960)
unfollow(s, 50393960) account.unfollow(s, 50393960)
mute(s, 50393960) account.mute(s, 50393960)
unmute(s, 50393960) account.unmute(s, 50393960)
enable_notifications(s, 50393960) account.enable_notifications(s, 50393960)
disable_notifications(s, 50393960) account.disable_notifications(s, 50393960)
block(s, 50393960) account.block(s, 50393960)
unblock(s, 50393960) account.unblock(s, 50393960)
# other # other
stats(s, 50393960) account.stats(s, 50393960)
# user profile # user profile
update_profile_image(s, 'test.jpg') account.update_profile_image(s, 'test.jpg')
update_profile_banner(s, 'test.png') account.update_profile_banner(s, 'test.png')
update_profile_info(s, name='Foo Bar', description='Test 123', location='Victoria, BC') account.update_profile_info(s, name='Foo Bar', description='Test 123', location='Victoria, BC')
# topics # topics
follow_topic(s, 808713037230157824) account.follow_topic(s, 808713037230157824)
unfollow_topic(s, 808713037230157824) account.unfollow_topic(s, 808713037230157824)
# lists # lists
create_list(s, 'My List', 'description of my list', private=False) account.create_list(s, 'My List', 'description of my list', private=False)
update_list(s, 123456, 'My Updated List', 'some updated description', private=False) account.update_list(s, 123456, 'My Updated List', 'some updated description', private=False)
update_list_banner(s, 123456, 'test.png') account.update_list_banner(s, 123456, 'test.png')
delete_list_banner(s, 123456) account.delete_list_banner(s, 123456)
add_list_member(s, 123456, 50393960) account.add_list_member(s, 123456, 50393960)
remove_list_member(s, 123456, 50393960) account.remove_list_member(s, 123456, 50393960)
delete_list(s, 123456) account.delete_list(s, 123456)
pin_list(s, 123456) account.pin_list(s, 123456)
unpin_list(s, 123456) account.unpin_list(s, 123456)
# refresh all pinned lists in this order # refresh all pinned lists in this order
update_pinned_lists(s, [123,234,345,456]) account.update_pinned_lists(s, [123, 234, 345, 456])
# unpin all lists # unpin all lists
update_pinned_lists(s, []) account.update_pinned_lists(s, [])
# example configuration # example configuration
update_account_settings(s, { account.update_settings(s, {
"address_book_live_sync_enabled": False, "address_book_live_sync_enabled": False,
"allow_ads_personalization": False, "allow_ads_personalization": False,
"allow_authenticated_periscope_requests": True, "allow_authenticated_periscope_requests": True,
@@ -142,7 +146,7 @@ setup(
}) })
# example configuration # example configuration
update_search_settings(s, { account.update_search_settings(s, {
"optInFiltering": True, # filter out nsfw content "optInFiltering": True, # filter out nsfw content
"optInBlocking": True, # filter out blocked accounts "optInBlocking": True, # filter out blocked accounts
}) })
@@ -155,48 +159,43 @@ setup(
#### Get all user/tweet data #### Get all user/tweet data
```python ```python
from twitter.scrape import * from twitter import scraper
from twitter.login import login from twitter.login import login
usr, pwd = ..., ... username,password = ...,...
s = login(usr, pwd) # session s = login(username, password) # session
user_ids = [...]
usernames = [...]
tweet_ids = [...]
####### User Data ######## ####### User Data ########
users = get_user_by_screen_name(s, usernames) users = scraper.get_user_by_screen_name(s, ['bob123', 'jim456', 'stanley789'])
tweets = get_user_tweets(s, user_ids) tweets = scraper.get_user_tweets(s, [123, 234, 345])
likes = get_likes(s, user_ids) likes = scraper.get_likes(s, [123, 234, 345])
tweets_and_replies = get_tweets_and_replies(s, user_ids) tweets_and_replies = scraper.get_tweets_and_replies(s, [123, 234, 345])
media = get_media(s, user_ids) media = scraper.get_media(s, [123, 234, 345])
following = get_following(s, user_ids) following = scraper.get_following(s, [123, 234, 345])
followers = get_followers(s, user_ids) followers = scraper.get_followers(s, [123, 234, 345])
######## Tweet Data ######## ######## Tweet Data ########
tweet = get_tweet_by_rest_id(s, tweet_ids) tweet = scraper.get_tweets_by_rest_id(s, [456, 567, 678])
tweet_detail = get_tweets(s, tweet_ids) tweet_detail = scraper.get_tweets(s, [456, 567, 678])
retweeters = get_retweeters(s, tweet_ids) retweeters = scraper.get_retweeters(s, [456, 567, 678])
favoriters = get_favoriters(s, tweet_ids) favoriters = scraper.get_favoriters(s, [456, 567, 678])
download_media(s, tweet_ids) scraper.download_media(s, [456, 567, 678])
``` ```
#### Most recent ~100 results of user/tweet data #### Most recent ~100 results of user/tweet data
```python ```python
from twitter.login import login from twitter.login import login
from twitter.scrape import query
from twitter.constants import Operation from twitter.constants import Operation
from twitter import scraper
from functools import partial from functools import partial
username, password = ..., ... username, password = ..., ...
session = login(username, password) session = login(username, password)
user_ids = [123, 234, 345, 456] user_ids = [123, 234, 345, 456]
user_query = partial(query, session, user_ids) user_query = partial(scraper.query, session, user_ids)
tweets = user_query(Operation.Data.UserTweets) tweets = user_query(Operation.Data.UserTweets)
likes = user_query(Operation.Data.Likes) likes = user_query(Operation.Data.Likes)
@@ -218,15 +217,6 @@ setup(
'ios android', 'ios android',
) )
``` ```
- search results are output to `~/data/raw`
- ~400 search results rate limiting occurs
**Search Operators Reference**
https://developer.twitter.com/en/docs/twitter-api/v1/rules-and-filtering/search-operators
https://developer.twitter.com/en/docs/twitter-api/tweets/search/integrate/build-a-query
'''), '''),
long_description_content_type='text/markdown', long_description_content_type='text/markdown',
author="Trevor Hobenshield", author="Trevor Hobenshield",

View File

@@ -3,6 +3,7 @@ import hashlib
import inspect import inspect
import logging.config import logging.config
import mimetypes import mimetypes
import random
import sys import sys
import time import time
from copy import deepcopy from copy import deepcopy
@@ -15,21 +16,23 @@ import ujson
from requests import Session, Response from requests import Session, Response
from tqdm import tqdm from tqdm import tqdm
from .config.log_config import log_config from .config.log import log_config
from .config.operations import operations from .config.operations import operations
from .config.settings import * from .config.settings import *
from .constants import Operation from .constants import *
from .utils import get_headers, build_query from .utils import get_headers, build_query
try: try:
if get_ipython().__class__.__name__ == 'ZMQInteractiveShell': if get_ipython().__class__.__name__ == 'ZMQInteractiveShell':
import nest_asyncio import nest_asyncio
nest_asyncio.apply() nest_asyncio.apply()
except: except:
... ...
if sys.platform != 'win32': if sys.platform != 'win32':
import uvloop import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
else: else:
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
@@ -96,73 +99,19 @@ def api(session: Session, path: str, settings: dict) -> Response:
return r return r
def upload_media(session: Session, filename: str, is_dm: bool = False, is_profile=False) -> int: @log(info=['json'])
if is_profile: def dm(session: Session, receivers: list[int], text: str, filename: str = '') -> Response:
url = 'https://upload.twitter.com/i/media/upload.json' name, _ = Operation.Account.useSendMessageMutation
params = deepcopy(operations[name])
qid = params['queryId']
params['variables']['target'] = {"participant_ids": receivers}
params['variables']['requestId'] = str(uuid1(getnode())) # can be anything
url = f"https://api.twitter.com/graphql/{qid}/{name}"
if filename:
media_id = upload_media(session, filename, is_dm=True)
params['variables']['message']['media'] = {'id': media_id, 'text': text}
else: else:
url = 'https://upload.twitter.com/1.1/media/upload.json' params['variables']['message']['text'] = {'text': text}
file = Path(filename)
total_bytes = file.stat().st_size
headers = get_headers(session)
upload_type = 'dm' if is_dm else 'tweet'
media_type = mimetypes.guess_type(file)[0]
media_category = f'{upload_type}_{media_type.split("/")[0]}'
if media_category in {'dm_image', 'tweet_image'} and total_bytes > MAX_IMAGE_SIZE:
raise Exception(f'Image too large: max is {(MAX_IMAGE_SIZE / 1e6):.2f} MB')
if media_category in {'dm_gif', 'tweet_gif'} and total_bytes > MAX_GIF_SIZE:
raise Exception(f'GIF too large: max is {(MAX_GIF_SIZE / 1e6):.2f} MB')
if media_category in {'dm_video', 'tweet_video'} and total_bytes > MAX_VIDEO_SIZE:
raise Exception(f'Video too large: max is {(MAX_VIDEO_SIZE / 1e6):.2f} MB')
data = {'command': 'INIT', 'media_type': media_type, 'total_bytes': total_bytes, 'media_category': media_category}
r = session.post(url=url, headers=headers, data=data)
media_id = r.json()['media_id']
desc = f"uploading: {file.name}"
with tqdm(total=total_bytes, desc=desc, unit='B', unit_scale=True, unit_divisor=1024) as pbar:
with open(file, 'rb') as f:
i = 0
while chunk := f.read(4 * 1024 * 1024): # todo: arbitrary max size for now
data = {'command': 'APPEND', 'media_id': media_id, 'segment_index': i}
files = {'media': chunk}
r = session.post(url=url, headers=headers, data=data, files=files)
if r.status_code < 200 or r.status_code > 299:
logger.debug(f'{r.status_code} {r.text}')
raise Exception('Upload failed')
i += 1
pbar.update(f.tell() - pbar.n)
data = {'command': 'FINALIZE', 'media_id': media_id, 'allow_async': 'true'}
if is_dm:
data |= {'original_md5': hashlib.md5(file.read_bytes()).hexdigest()}
r = session.post(url=url, headers=headers, data=data)
logger.debug(f'processing, please wait...')
processing_info = r.json().get('processing_info')
while processing_info:
state = processing_info['state']
if state == 'succeeded':
break
if state == 'failed':
raise Exception('Media processing failed')
check_after_secs = processing_info['check_after_secs']
# logger.debug(f'{check_after_secs = }')
time.sleep(check_after_secs)
params = {'command': 'STATUS', 'media_id': media_id}
r = session.get(url=url, headers=headers, params=params)
processing_info = r.json().get('processing_info')
logger.debug('processing complete')
return media_id
@log(info=['text'])
def add_alt_text(session: Session, media_id: int, text: str) -> Response:
params = {"media_id": media_id, "alt_text": {"text": text}}
url = 'https://api.twitter.com/1.1/media/metadata/create.json'
r = session.post(url, headers=get_headers(session), json=params) r = session.post(url, headers=get_headers(session), json=params)
return r return r
@@ -203,6 +152,101 @@ def tweet(session: Session, text: str, media: list[dict | str] = None, **kwargs)
return r return r
@log(info=['json'])
def create_poll(session: Session, text: str, choices: list[str], poll_duration: int) -> Response:
options = {
"twitter:card": "poll4choice_text_only",
"twitter:api:api:endpoint": "1",
"twitter:long:duration_minutes": poll_duration # max: 10080
}
for i, c in enumerate(choices):
options[f"twitter:string:choice{i + 1}_label"] = c
headers = get_headers(session)
headers['content-type'] = 'application/x-www-form-urlencoded'
url = 'https://caps.twitter.com/v2/cards/create.json'
r = session.post(url, headers=headers, params={'card_data': ujson.dumps(options)})
card_uri = r.json()['card_uri']
r = tweet(session, text, poll_params={'card_uri': card_uri})
return r
def check_media(category: str, total_bytes: int) -> None:
def check(media):
name, size = media
fmt = lambda x: f'{(x / 1e6):.2f} MB'
if name in category and total_bytes > size:
raise Exception(f'cannot upload {fmt(total_bytes)} {name}: max {name} size is {fmt(size)}')
tuple(map(check, (Media.Type.image, Media.Type.gif, Media.Type.video)))
def upload_media(session: Session, filename: str, is_dm: bool = False, is_profile=False) -> int:
if is_profile:
url = 'https://upload.twitter.com/i/media/upload.json'
else:
url = 'https://upload.twitter.com/1.1/media/upload.json'
file = Path(filename)
total_bytes = file.stat().st_size
headers = get_headers(session)
upload_type = 'dm' if is_dm else 'tweet'
media_type = mimetypes.guess_type(file)[0]
media_category = f'{upload_type}_{media_type.split("/")[0]}'
check_media(media_category, total_bytes)
data = {'command': 'INIT', 'media_type': media_type, 'total_bytes': total_bytes, 'media_category': media_category}
r = session.post(url=url, headers=headers, data=data)
media_id = r.json()['media_id']
desc = f"uploading: {file.name}"
with tqdm(total=total_bytes, desc=desc, unit='B', unit_scale=True, unit_divisor=1024) as pbar:
with open(file, 'rb') as f:
i = 0
while chunk := f.read(UPLOAD_CHUNK_SIZE): # todo: arbitrary max size for now
data = {'command': 'APPEND', 'media_id': media_id, 'segment_index': i}
files = {'media': chunk}
r = session.post(url=url, headers=headers, data=data, files=files)
if r.status_code < 200 or r.status_code > 299:
logger.debug(f'{r.status_code} {r.text}')
raise Exception('Upload failed')
i += 1
pbar.update(f.tell() - pbar.n)
data = {'command': 'FINALIZE', 'media_id': media_id, 'allow_async': 'true'}
if is_dm:
data |= {'original_md5': hashlib.md5(file.read_bytes()).hexdigest()}
r = session.post(url=url, headers=headers, data=data)
logger.debug(f'processing, please wait...')
processing_info = r.json().get('processing_info')
while processing_info:
state = processing_info['state']
logger.debug(f'{processing_info = }')
if state == MEDIA_UPLOAD_SUCCEED:
break
if state == MEDIA_UPLOAD_FAIL:
raise Exception('Media processing failed')
check_after_secs = processing_info.get('check_after_secs', random.randint(1, 5))
time.sleep(check_after_secs)
params = {'command': 'STATUS', 'media_id': media_id}
r = session.get(url=url, headers=headers, params=params)
processing_info = r.json().get('processing_info')
logger.debug('processing complete')
return media_id
@log(info=['text'])
def add_alt_text(session: Session, media_id: int, text: str) -> Response:
params = {"media_id": media_id, "alt_text": {"text": text}}
url = 'https://api.twitter.com/1.1/media/metadata/create.json'
r = session.post(url, headers=get_headers(session), json=params)
return r
def comment(session: Session, tweet_id: int, text: str, media: list[dict | str] = None) -> Response: def comment(session: Session, tweet_id: int, text: str, media: list[dict | str] = None) -> Response:
params = {"reply": {"in_reply_to_tweet_id": tweet_id, "exclude_reply_user_ids": []}} params = {"reply": {"in_reply_to_tweet_id": tweet_id, "exclude_reply_user_ids": []}}
return tweet(session, text, media, reply_params=params) return tweet(session, text, media, reply_params=params)
@@ -249,211 +293,6 @@ def unbookmark(session: Session, tweet_id: int) -> Response:
return gql(session, Operation.Account.DeleteBookmark, {'tweet_id': tweet_id}) return gql(session, Operation.Account.DeleteBookmark, {'tweet_id': tweet_id})
@log(info=['json'])
def follow(session: Session, user_id: int) -> Response:
settings = follow_settings.copy()
settings |= {"user_id": user_id}
return api(session, 'friendships/create.json', settings)
@log(info=['json'])
def unfollow(session: Session, user_id: int) -> Response:
settings = follow_settings.copy()
settings |= {"user_id": user_id}
return api(session, 'friendships/destroy.json', settings)
@log(info=['json'])
def mute(session: Session, user_id: int) -> Response:
settings = {'user_id': user_id}
return api(session, 'mutes/users/create.json', settings)
@log(info=['json'])
def unmute(session: Session, user_id: int) -> Response:
settings = {'user_id': user_id}
return api(session, 'mutes/users/destroy.json', settings)
@log(info=['json'])
def enable_notifications(session: Session, user_id: int) -> Response:
settings = notification_settings.copy()
settings |= {'id': user_id, 'device': 'true'}
return api(session, 'friendships/update.json', settings)
@log(info=['json'])
def disable_notifications(session: Session, user_id: int) -> Response:
settings = notification_settings.copy()
settings |= {'id': user_id, 'device': 'false'}
return api(session, 'friendships/update.json', settings)
@log(info=['json'])
def block(session: Session, user_id: int) -> Response:
settings = {'user_id': user_id}
return api(session, 'blocks/create.json', settings)
@log(info=['json'])
def unblock(session: Session, user_id: int) -> Response:
settings = {'user_id': user_id}
return api(session, 'blocks/destroy.json', settings)
@log(info=['json'])
def stats(session: Session, rest_id: int) -> Response:
"""private endpoint?"""
name, _ = Operation.Account.TweetStats
params = deepcopy(operations[name])
qid = params['queryId']
params['variables']['rest_id'] = rest_id
query = build_query(params)
url = f"https://api.twitter.com/graphql/{qid}/{name}?{query}"
r = session.get(url, headers=get_headers(session))
return r
@log(info=['json'])
def dm(session: Session, receivers: list[int], text: str, filename: str = '') -> Response:
name, _ = Operation.Account.useSendMessageMutation
params = deepcopy(operations[name])
qid = params['queryId']
params['variables']['target'] = {"participant_ids": receivers}
params['variables']['requestId'] = str(uuid1(getnode())) # can be anything
url = f"https://api.twitter.com/graphql/{qid}/{name}"
if filename:
media_id = upload_media(session, filename, is_dm=True)
params['variables']['message']['media'] = {'id': media_id, 'text': text}
else:
params['variables']['message']['text'] = {'text': text}
r = session.post(url, headers=get_headers(session), json=params)
return r
@log(info=['json'])
def update_profile_image(session: Session, filename: str) -> Response:
media_id = upload_media(session, filename, is_profile=True)
url = 'https://api.twitter.com/1.1/account/update_profile_image.json'
headers = get_headers(session)
params = {'media_id': media_id}
r = session.post(url, headers=headers, params=params)
return r
@log
def update_profile_banner(session: Session, filename: str) -> Response:
media_id = upload_media(session, filename, is_profile=True)
url = 'https://api.twitter.com/1.1/account/update_profile_banner.json'
headers = get_headers(session)
params = {'media_id': media_id}
r = session.post(url, headers=headers, params=params)
return r
@log
def update_profile_info(session: Session, **kwargs) -> Response:
url = 'https://api.twitter.com/1.1/account/update_profile.json'
headers = get_headers(session)
r = session.post(url, headers=headers, params=kwargs)
return r
@log(info=['json'])
def create_poll(session: Session, text: str, choices: list[str], poll_duration: int) -> Response:
options = {
"twitter:card": "poll4choice_text_only",
"twitter:api:api:endpoint": "1",
"twitter:long:duration_minutes": poll_duration # max: 10080
}
for i, c in enumerate(choices):
options[f"twitter:string:choice{i + 1}_label"] = c
headers = get_headers(session)
headers['content-type'] = 'application/x-www-form-urlencoded'
url = 'https://caps.twitter.com/v2/cards/create.json'
r = session.post(url, headers=headers, params={'card_data': ujson.dumps(options)})
card_uri = r.json()['card_uri']
r = tweet(session, text, poll_params={'card_uri': card_uri})
return r
@log(info=['json'])
def pin(session: Session, tweet_id: int) -> Response:
settings = {'tweet_mode': 'extended', 'id': tweet_id}
return api(session, 'account/pin_tweet.json', settings)
@log(info=['json'])
def unpin(session: Session, tweet_id: int) -> Response:
settings = {'tweet_mode': 'extended', 'id': tweet_id}
return api(session, 'account/unpin_tweet.json', settings)
@log(info=['text'])
def update_search_settings(session: Session, settings: dict) -> Response:
"""
Update account search settings
@param session: authenticated session
@param settings: search filtering settings to enable/disable
@return: authenticated session
"""
twid = int(session.cookies.get_dict()['twid'].split('=')[-1].strip('"'))
headers = get_headers(session=session)
r = session.post(
url=f'https://api.twitter.com/1.1/strato/column/User/{twid}/search/searchSafety',
headers=headers,
json=settings,
)
return r
@log(info=['json'])
def update_account_settings(session: Session, settings: dict) -> Response:
"""
Update account settings
@param session: authenticated session
@param settings: settings to enable/disable
@return: authenticated session
"""
return api(session, 'account/settings.json', settings)
@log(info=['json'])
def remove_interests(session: Session, *args):
url = 'https://api.twitter.com/1.1/account/personalization/twitter_interests.json'
r = session.get(url, headers=get_headers(session))
current_interests = r.json()['interested_in']
if args == 'all':
disabled_interests = [x['id'] for x in current_interests]
else:
disabled_interests = [x['id'] for x in current_interests if x['display_name'] in args]
payload = {
"preferences": {
"interest_preferences": {
"disabled_interests": disabled_interests,
"disabled_partner_interests": []
}
}
}
url = 'https://api.twitter.com/1.1/account/personalization/p13n_preferences.json'
r = session.post(url, headers=get_headers(session), json=payload)
return r
@log(info=['json'])
def __get_lists(session: Session) -> Response:
name, _ = Operation.Account.ListsManagementPageTimeline
params = deepcopy(operations[name])
qid = params['queryId']
query = build_query(params)
url = f"https://api.twitter.com/graphql/{qid}/{name}?{query}"
r = session.get(url, headers=get_headers(session))
return r
@log(info=['json']) @log(info=['json'])
def create_list(session: Session, name: str, description: str, private: bool) -> Response: def create_list(session: Session, name: str, description: str, private: bool) -> Response:
variables = { variables = {
@@ -533,3 +372,171 @@ def unfollow_topic(session: Session, topic_id: int) -> Response:
@log(info=['json']) @log(info=['json'])
def follow_topic(session: Session, topic_id: int) -> Response: def follow_topic(session: Session, topic_id: int) -> Response:
return gql(session, Operation.Account.TopicFollow, {'topicId': str(topic_id)}) return gql(session, Operation.Account.TopicFollow, {'topicId': str(topic_id)})
@log(info=['json'])
def follow(session: Session, user_id: int) -> Response:
settings = follow_settings.copy()
settings |= {"user_id": user_id}
return api(session, 'friendships/create.json', settings)
@log(info=['json'])
def unfollow(session: Session, user_id: int) -> Response:
settings = follow_settings.copy()
settings |= {"user_id": user_id}
return api(session, 'friendships/destroy.json', settings)
@log(info=['json'])
def mute(session: Session, user_id: int) -> Response:
settings = {'user_id': user_id}
return api(session, 'mutes/users/create.json', settings)
@log(info=['json'])
def unmute(session: Session, user_id: int) -> Response:
settings = {'user_id': user_id}
return api(session, 'mutes/users/destroy.json', settings)
@log(info=['json'])
def enable_notifications(session: Session, user_id: int) -> Response:
settings = notification_settings.copy()
settings |= {'id': user_id, 'device': 'true'}
return api(session, 'friendships/update.json', settings)
@log(info=['json'])
def disable_notifications(session: Session, user_id: int) -> Response:
settings = notification_settings.copy()
settings |= {'id': user_id, 'device': 'false'}
return api(session, 'friendships/update.json', settings)
@log(info=['json'])
def block(session: Session, user_id: int) -> Response:
settings = {'user_id': user_id}
return api(session, 'blocks/create.json', settings)
@log(info=['json'])
def unblock(session: Session, user_id: int) -> Response:
settings = {'user_id': user_id}
return api(session, 'blocks/destroy.json', settings)
@log(info=['json'])
def pin(session: Session, tweet_id: int) -> Response:
settings = {'tweet_mode': 'extended', 'id': tweet_id}
return api(session, 'account/pin_tweet.json', settings)
@log(info=['json'])
def unpin(session: Session, tweet_id: int) -> Response:
settings = {'tweet_mode': 'extended', 'id': tweet_id}
return api(session, 'account/unpin_tweet.json', settings)
@log(info=['json'])
def stats(session: Session, rest_id: int) -> Response:
"""private endpoint?"""
name, _ = Operation.Account.TweetStats
params = deepcopy(operations[name])
qid = params['queryId']
params['variables']['rest_id'] = rest_id
query = build_query(params)
url = f"https://api.twitter.com/graphql/{qid}/{name}?{query}"
r = session.get(url, headers=get_headers(session))
return r
@log(info=['json'])
def remove_interests(session: Session, *args):
url = 'https://api.twitter.com/1.1/account/personalization/twitter_interests.json'
r = session.get(url, headers=get_headers(session))
current_interests = r.json()['interested_in']
if args == 'all':
disabled_interests = [x['id'] for x in current_interests]
else:
disabled_interests = [x['id'] for x in current_interests if x['display_name'] in args]
payload = {
"preferences": {
"interest_preferences": {
"disabled_interests": disabled_interests,
"disabled_partner_interests": []
}
}
}
url = 'https://api.twitter.com/1.1/account/personalization/p13n_preferences.json'
r = session.post(url, headers=get_headers(session), json=payload)
return r
@log(info=['json'])
def update_profile_image(session: Session, filename: str) -> Response:
media_id = upload_media(session, filename, is_profile=True)
url = 'https://api.twitter.com/1.1/account/update_profile_image.json'
headers = get_headers(session)
params = {'media_id': media_id}
r = session.post(url, headers=headers, params=params)
return r
@log
def update_profile_banner(session: Session, filename: str) -> Response:
media_id = upload_media(session, filename, is_profile=True)
url = 'https://api.twitter.com/1.1/account/update_profile_banner.json'
headers = get_headers(session)
params = {'media_id': media_id}
r = session.post(url, headers=headers, params=params)
return r
@log
def update_profile_info(session: Session, **kwargs) -> Response:
url = 'https://api.twitter.com/1.1/account/update_profile.json'
headers = get_headers(session)
r = session.post(url, headers=headers, params=kwargs)
return r
@log(info=['text'])
def update_search_settings(session: Session, settings: dict) -> Response:
"""
Update account search settings
@param session: authenticated session
@param settings: search filtering settings to enable/disable
@return: authenticated session
"""
twid = int(session.cookies.get_dict()['twid'].split('=')[-1].strip('"'))
headers = get_headers(session=session)
r = session.post(
url=f'https://api.twitter.com/1.1/strato/column/User/{twid}/search/searchSafety',
headers=headers,
json=settings,
)
return r
@log(info=['json'])
def update_settings(session: Session, settings: dict) -> Response:
"""
Update account settings
@param session: authenticated session
@param settings: settings to enable/disable
@return: authenticated session
"""
return api(session, 'account/settings.json', settings)
# @log(info=['json'])
# def __get_lists(session: Session) -> Response:
# name, _ = Operation.Account.ListsManagementPageTimeline
# params = deepcopy(operations[name])
# qid = params['queryId']
# query = build_query(params)
# url = f"https://api.twitter.com/graphql/{qid}/{name}?{query}"
# r = session.get(url, headers=get_headers(session))
# return r

View File

@@ -1,13 +1,3 @@
MAX_IMAGE_SIZE = 5_242_880 # ~5 MB
MAX_GIF_SIZE = 15_728_640 # ~15 MB
MAX_VIDEO_SIZE = 536_870_912 # ~530 MB
CHUNK_SIZE = 8192
BOLD = '\u001b[1m'
SUCCESS = '\u001b[32m'
WARN = '\u001b[31m'
RESET = '\u001b[0m'
account_settings = { account_settings = {
"address_book_live_sync_enabled": False, "address_book_live_sync_enabled": False,
"allow_ads_personalization": False, "allow_ads_personalization": False,
@@ -143,4 +133,4 @@ search_config = {
"spelling_corrections": 1, "spelling_corrections": 1,
"include_ext_edit_control": "true", "include_ext_edit_control": "true",
"ext": "mediaStats,highlightedLabel,hasNftAvatar,voiceInfo,birdwatchPivot,enrichments,superFollowMetadata,unmentionInfo,editControl,collab_control,vibe" "ext": "mediaStats,highlightedLabel,hasNftAvatar,voiceInfo,birdwatchPivot,enrichments,superFollowMetadata,unmentionInfo,editControl,collab_control,vibe"
} }

View File

@@ -1,22 +1,40 @@
import json
from enum import Enum, member from enum import Enum, member
BOLD = '\u001b[1m'
SUCCESS = '\u001b[32m'
WARN = '\u001b[31m'
RESET = '\u001b[0m'
UPLOAD_CHUNK_SIZE = 4 * 1024 * 1024
MEDIA_UPLOAD_SUCCEED = 'succeeded'
MEDIA_UPLOAD_FAIL = 'failed'
class Value: class Value:
__slots__ = 'value'
def __init__(self, value: any = None): def __init__(self, value: any = None):
self.value = value self.value = value
class Operation(Enum): class CustomEnum(Enum):
"""
Enum with repeated values for GraphQL operations
"""
def __getattr__(self, item): def __getattr__(self, item):
if item != "_value_": if item != "_value_":
attr = getattr(self.value, item) attr = getattr(self.value, item)
return attr.name, attr.value.value return attr.name, attr.value.value
raise AttributeError raise AttributeError
class Media(CustomEnum):
@member
class Type(Enum):
image = Value(5_242_880) # ~5 MB
gif = Value(15_728_640) # ~15 MB
video = Value(536_870_912) # ~530 MB
class Operation(CustomEnum):
@member @member
class Data(Enum): class Data(Enum):
# tweet # tweet

View File

@@ -1,22 +1,25 @@
import sys import sys
from requests import Session from requests import Session
from .constants import SUCCESS, WARN, BOLD, RESET
def update_token(session: Session, key: str, url: str, payload: dict) -> Session: def update_token(session: Session, key: str, url: str, payload: dict) -> Session:
headers = { try:
"authorization": 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA', headers = {
"content-type": "application/json", "authorization": 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
"user-agent": 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36', "content-type": "application/json",
"x-guest-token": session.cookies.get('guest_token'), "user-agent": 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36',
"x-csrf-token": session.cookies.get("ct0"), "x-guest-token": session.cookies.get('guest_token'),
"x-twitter-auth-type": "OAuth2Session" if session.cookies.get("auth_token") else '', "x-csrf-token": session.cookies.get("ct0"),
"x-twitter-active-user": "yes", "x-twitter-auth-type": "OAuth2Session" if session.cookies.get("auth_token") else '',
"x-twitter-client-language": 'en', "x-twitter-active-user": "yes",
} "x-twitter-client-language": 'en',
r = session.post(url, headers=headers, json=payload).json() }
status = f'\u001b[32mSUCCESS' if r.get('guest_token') or r.get('flow_token') else f'\u001b[31mFAILED' r = session.post(url, headers=headers, json=payload).json()
print(f'{status}\u001b[0m {sys._getframe(1).f_code.co_name}') # check response data # print(f'{SUCCESS}{sys._getframe(1).f_code.co_name}{RESET}')
session.cookies.set(key, r[key]) session.cookies.set(key, r[key])
except KeyError as e:
print(f'[{WARN}FAILED{RESET}] failed to update token at {BOLD}{sys._getframe(1).f_code.co_name}{RESET}')
return session return session
@@ -89,4 +92,6 @@ def login(username: str, password: str) -> Session:
"guest_token": None, "guest_token": None,
"flow_token": None, "flow_token": None,
}) })
return execute_login_flow(session) session = execute_login_flow(session)
print(f'[{SUCCESS}SUCCESS{RESET}] {BOLD}{username}{RESET} logged in successfully')
return session

View File

@@ -13,20 +13,22 @@ import ujson
from aiohttp import ClientSession, TCPConnector from aiohttp import ClientSession, TCPConnector
from .config.operations import operations from .config.operations import operations
from .config.log_config import log_config from .config.log import log_config
from .constants import Operation from .constants import *
from .login import Session from .login import Session
from .utils import find_key, build_query, get_headers from .utils import find_key, build_query, get_headers
try: try:
if get_ipython().__class__.__name__ == 'ZMQInteractiveShell': if get_ipython().__class__.__name__ == 'ZMQInteractiveShell':
import nest_asyncio import nest_asyncio
nest_asyncio.apply() nest_asyncio.apply()
except: except:
... ...
if sys.platform != 'win32': if sys.platform != 'win32':
import uvloop import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
else: else:
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
@@ -74,7 +76,7 @@ def get_tweets(session: Session, ids: list[int], limit=math.inf):
# no pagination needed # no pagination needed
def get_tweet_by_rest_id(session: Session, ids: list[int]): def get_tweets_by_rest_id(session: Session, ids: list[int]):
return run(session, ids, Operation.Data.TweetResultByRestId) return run(session, ids, Operation.Data.TweetResultByRestId)
@@ -138,10 +140,15 @@ async def get(session: ClientSession, url: tuple) -> dict:
logger.debug(f'processing: {url}') logger.debug(f'processing: {url}')
try: try:
r = await session.get(api_url) r = await session.get(api_url)
limits = {k: v for k, v in r.headers.items() if 'x-rate-limit' in k}
logger.debug(f'{limits = }')
if r.status == 429:
logger.debug(f'rate limit exceeded: {url}')
return {}
data = await r.json() data = await r.json()
return {ID: identifier, **data} return {ID: identifier, **data}
except Exception as e: except Exception as e:
logger.debug(e) logger.debug(f'failed to download {url}: {e}')
async def pagination(session: Session, res: list, operation: tuple, limit: int) -> tuple: async def pagination(session: Session, res: list, operation: tuple, limit: int) -> tuple:
@@ -166,50 +173,53 @@ async def paginate(session: ClientSession, data: dict, operation: tuple, limit:
return itemContent['value'] # v2 cursor return itemContent['value'] # v2 cursor
return content['value'] # v1 cursor return content['value'] # v1 cursor
name, key = operation
params = deepcopy(operations[name])
qid = params['queryId']
ids = set()
counts = []
all_data = [] all_data = []
try:
name, key = operation
params = deepcopy(operations[name])
qid = params['queryId']
params['variables'][key] = data[ID] ids = set()
cursor = get_cursor(data) counts = []
while 1: params['variables'][key] = data[ID]
params['variables']['cursor'] = cursor cursor = get_cursor(data)
query = build_query(params)
url = f"https://api.twitter.com/graphql/{qid}/{name}?{query}"
# update csrf header - must be an easier way without importing yarl while 1:
if k := session.cookie_jar.__dict__['_cookies'].get('twitter.com'): params['variables']['cursor'] = cursor
if cookie := re.search('(?<=ct0\=)\w+(?=;)', str(k)): query = build_query(params)
session.headers.update({"x-csrf-token": cookie.group()}) url = f"https://api.twitter.com/graphql/{qid}/{name}?{query}"
_data = await backoff(lambda: session.get(url)) # code [353]: "This request requires a matching csrf cookie and header."
tagged_data = _data | {ID: data[ID]} r, _data = await backoff(lambda: session.get(url))
save_data([tagged_data], name) if csrf := r.cookies.get("ct0"):
all_data.append(tagged_data) session.headers.update({"x-csrf-token": csrf.value})
cursor = get_cursor(_data) session.cookie_jar.update_cookies(r.cookies)
logger.debug(f'{cursor = }')
ids |= set(find_key(tagged_data, 'rest_id'))
logger.debug(f'({data[ID]})\t{len(ids)} unique results')
counts.append(len(ids))
# followers/following have "0|" tagged_data = _data | {ID: data[ID]}
if not cursor or cursor.startswith('0|'): save_data([tagged_data], name)
logger.debug(f'[SUCCESS] done pagination\tlast cursor: {cursor}') all_data.append(tagged_data)
break cursor = get_cursor(_data)
if len(ids) >= limit: logger.debug(f'{cursor = }')
logger.debug(f'[SUCCESS] done pagination\tsurpassed limit of {limit} results') ids |= set(find_key(tagged_data, 'rest_id'))
break logger.debug(f'({data[ID]})\t{len(ids)} unique results')
# did last 5 requests return duplicate data? counts.append(len(ids))
if len(counts) > DUP_LIMIT and len(set(counts[-1:-DUP_LIMIT:-1])) == 1:
logger.debug(f'[SUCCESS] done pagination\tpast {DUP_LIMIT} requests returned duplicate data')
break
save_data(all_data, name) success_message = f'[{SUCCESS}SUCCESS{RESET}] done pagination'
# followers/following have "0|"
if not cursor or cursor.startswith('0|'):
logger.debug(f'{success_message}\tlast cursor: {cursor}')
break
if len(ids) >= limit:
logger.debug(f'{success_message}\tsurpassed limit of {limit} results')
break
# did last 5 requests return duplicate data?
if len(counts) > DUP_LIMIT and len(set(counts[-1:-DUP_LIMIT:-1])) == 1:
logger.debug(f'{success_message}\tpast {DUP_LIMIT} requests returned duplicate data')
break
except Exception as e:
logger.debug(f'paginate falied: {e}')
# save_data(all_data, name)
return all_data return all_data
@@ -218,10 +228,10 @@ async def backoff(fn, retries=12):
try: try:
r = await fn() r = await fn()
data = await r.json() data = await r.json()
return data return r, data
except Exception as e: except Exception as e:
if i == retries: if i == retries:
logger.debug(f'Max retries exceeded\n{e}') logger.debug(f'{WARN}Max retries exceeded{RESET}\n{e}')
return return
t = 2 ** i + random.random() t = 2 ** i + random.random()
logger.debug(f'retrying in {f"{t:.2f}"} seconds\t\t{e}') logger.debug(f'retrying in {f"{t:.2f}"} seconds\t\t{e}')
@@ -229,11 +239,14 @@ async def backoff(fn, retries=12):
def save_data(data: list, name: str = ''): def save_data(data: list, name: str = ''):
for d in data: try:
path = Path(f'data/raw/{d[ID]}') for d in data:
path.mkdir(parents=True, exist_ok=True) path = Path(f'data/raw/{d[ID]}')
with open(path / f'{time.time_ns()}_{name}.json', 'w') as fp: path.mkdir(parents=True, exist_ok=True)
ujson.dump(d, fp, indent=4) with open(path / f'{time.time_ns()}_{name}.json', 'w') as fp:
ujson.dump(d, fp, indent=4)
except KeyError as e:
logger.debug(f'failed to save data: {e}')
def download(session: Session, post_url: str, cdn_url: str, path: str = 'media', chunk_size: int = 4096) -> None: def download(session: Session, post_url: str, cdn_url: str, path: str = 'media', chunk_size: int = 4096) -> None:
@@ -259,7 +272,7 @@ def download(session: Session, post_url: str, cdn_url: str, path: str = 'media',
def download_media(session: Session, ids: list[int], photos: bool = True, videos: bool = True) -> None: def download_media(session: Session, ids: list[int], photos: bool = True, videos: bool = True) -> None:
res = get_tweet_by_rest_id(session, ids) res = get_tweets_by_rest_id(session, ids)
for r in res: for r in res:
user_id = find_key(r, 'user_results')[0]['result']['rest_id'] user_id = find_key(r, 'user_results')[0]['result']['rest_id']
url = f'https://twitter.com/{user_id}/status/{r[ID]}' # evaluates to username in browser url = f'https://twitter.com/{user_id}/status/{r[ID]}' # evaluates to username in browser

View File

@@ -12,7 +12,7 @@ from urllib.parse import quote, urlencode, parse_qs, urlsplit, urlunsplit
import aiohttp import aiohttp
import requests import requests
from .config.log_config import log_config from .config.log import log_config
from .config.settings import search_config from .config.settings import search_config
IN_PATH = Path('~/data/raw').expanduser() IN_PATH = Path('~/data/raw').expanduser()