1 某个用户的所有推特内容2 包罗某个关键词的推特3 用户的粉丝,存眷者,以及下一步粉丝存眷者信息4 按照热度,转发量爬虫远古数据5 按照坐标,天文位置,金纬度,翻开等爬虫推特6 爬虫推特的评论,留言,以及留言者的信息7 用bot机器人评论他人的推特等......

import abc import copy import dataclasses import datetime import functools import json import logging import requests import time import warnings logger = logging.getLogger(__name__) class _DeprecatedProperty: def __init__(self, name, repl, replStr): self.name = name self.repl = repl self.replStr = replStr def __get__(self, obj, objType): if obj is None: # if the access is through the class using _DeprecatedProperty rather than an instance of the class: return self warnings.warn(f{self.name} is deprecated, use {self.replStr} instead, FutureWarning, stacklevel = 2) return self.repl(obj) def _json_serialise_datetime(obj): A JSON serialiser that converts datetime.datetime and datetime.date objects to ISO-8601 strings. if isinstance(obj, (datetime.datetime, datetime.date)): return obj.isoformat() raise TypeError(fObject of type {type(obj)} is not JSON serializable) def _json_dataclass_to_dict(obj): if isinstance(obj, _JSONDataclass) or dataclasses.is_dataclass(obj): out = {} out[_type] = f{type(obj).__module__}.{type(obj).__name__} for field in dataclasses.fields(obj): assert field.name != _type out[field.name] = _json_dataclass_to_dict(getattr(obj, field.name)) # Add in (non-deprecated) properties for k in dir(obj): if isinstance(getattr(type(obj), k, None), property): assert k != _type out[k] = _json_dataclass_to_dict(getattr(obj, k)) return out elif isinstance(obj, (tuple, list)): return type(obj)(_json_dataclass_to_dict(x) for x in obj) elif isinstance(obj, dict): return {_json_dataclass_to_dict(k): _json_dataclass_to_dict(v) for k, v in obj.items()} elif isinstance(obj, set): return {_json_dataclass_to_dict(v) for v in obj} else: return copy.deepcopy(obj) @dataclasses.dataclass class _JSONDataclass: A base class for dataclasses for conversion to JSON def json(self): Convert the object to a JSON string out = _json_dataclass_to_dict(self) for key, value in list(out.items()): # Modifying the dict below, so make a copy first if isinstance(value, IntWithGranularity): out[key] = int(value) assert f{key}.granularity not in out, fGranularity collision on {key}.granularity out[f{key}.granularity] = value.granularity return json.dumps(out, default = _json_serialise_datetime) @dataclasses.dataclass class Item(_JSONDataclass): An abstract base class for an item returned by the scrapers get_items generator. An item can really be anything. The string representation should be useful for the CLI output (e.g. a direct URL for the item). @abc.abstractmethod def __str__(self): pass @dataclasses.dataclass class Entity(_JSONDataclass): An abstract base class for an entity returned by the scrapers entity property. An entity is typically the account of a person or organisation. The string representation should be the preferred direct URL to the entitys page on the network. @abc.abstractmethod def __str__(self): pass class IntWithGranularity(int): A number with an associated granularity For example, an IntWithGranularity(42000, 1000) represents a number on the order of 42000 with two significant digits, i.e. something counted with a granularity of 1000. def __new__(cls, value, granularity, *args, **kwargs): obj = super().__new__(cls, value, *args, **kwargs) obj.granularity = granularity return obj def __reduce__(self): return (IntWithGranularity, (int(self), self.granularity)) class URLItem(Item): A generic item which only holds a URL string. def __init__(self, url): self._url = url @property def url(self): return self._url def __str__(self): return self._url class ScraperException(Exception): pass class Scraper: An abstract base class for a scraper. name = None def __init__(self, retries = 3): self._retries = retries self._session = requests.Session() @abc.abstractmethod def get_items(self): Iterator yielding Items. pass def _get_entity(self): Get the entity behind the scraper, if any. This is the method implemented by subclasses for doing the actual retrieval/entity object creation. For accessing the scrapers entity, use the entity property. return None @functools.cached_property def entity(self): return self._get_entity() def _request(self, method, url, params = None, data = None, headers = None, timeout = 10, responseOkCallback = None, allowRedirects = True): for attempt in range(self._retries + 1): # The request is newly prepared on each retry because of potential cookie updates. req = self._session.prepare_request(requests.Request(method, url, params = params, data = data, headers = headers)) logger.info(fRetrieving {req.url}) logger.debug(f... with headers: {headers!r}) if data: logger.debug(f... with data: {data!r}) try: r = self._session.send(req, allow_redirects = allowRedirects, timeout = timeout) except requests.exceptions.RequestException as exc: if attempt < self._retries: retrying = , retrying level = logging.INFO else: retrying = level = logging.ERROR logger.log(level, fError retrieving {req.url}: {exc!r}{retrying}) else: if responseOkCallback is not None: success, msg = responseOkCallback(r) else: success, msg = (True, None) msg = f: {msg} if msg else if success: logger.debug(f{req.url} retrieved successfully{msg}) return r else: if attempt < self._retries: retrying = , retrying level = logging.INFO else: retrying = level = logging.ERROR logger.log(level, fError retrieving {req.url}{msg}{retrying}) if attempt < self._retries: sleepTime = 1.0 * 2**attempt # exponential backoff: sleep 1 second after first attempt, 2 after second, 4 after third, etc. logger.info(fWaiting {sleepTime:.0f} seconds) time.sleep(sleepTime) else: msg = f{self._retries + 1} requests to {req.url} failed, giving up. logger.fatal(msg) raise ScraperException(msg) raise RuntimeError(Reached unreachable code) def _get(self, *args, **kwargs): return self._request(GET, *args, **kwargs) def _post(self, *args, **kwargs): return self._request(POST, *args, **kwargs) @classmethod @abc.abstractmethod def setup_parser(cls, subparser): pass @classmethod @abc.abstractmethod def from_args(cls, args): pass脚本文件:
import bs4 import collections import dataclasses import datetime import email.utils import enum import itertools import json import random import logging import re import twitter_base import string import time import typing import urllib.parse logger = logging.getLogger(__name__) _API_AUTHORIZATION_HEADER = Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA @dataclasses.dataclass class Tweet(twitter_base.Item): url: str date: datetime.datetime content: str renderedContent: str id: int user: User replyCount: int retweetCount: int likeCount: int quoteCount: int conversationId: int lang: str source: str sourceUrl: typing.Optional[str] = None sourceLabel: typing.Optional[str] = None outlinks: typing.Optional[typing.List[str]] = None tcooutlinks: typing.Optional[typing.List[str]] = None media: typing.Optional[typing.List[Medium]] = None retweetedTweet: typing.Optional[Tweet] = None quotedTweet: typing.Optional[Tweet] = None inReplyToTweetId: typing.Optional[int] = None inReplyToUser: typing.Optional[User] = None mentionedUsers: typing.Optional[typing.List[User]] = None coordinates: typing.Optional[Coordinates] = None place: typing.Optional[Place] = None hashtags: typing.Optional[typing.List[str]] = None cashtags: typing.Optional[typing.List[str]] = None username = twitter_base._DeprecatedProperty(username, lambda self: self.user.username, user.username) outlinksss = twitter_base._DeprecatedProperty(outlinksss, lambda self: .join(self.outlinks) if self.outlinks else , outlinks) tcooutlinksss = twitter_base._DeprecatedProperty(tcooutlinksss, lambda self: .join(self.tcooutlinks) if self.tcooutlinks else , tcooutlinks) def __str__(self): return self.url class Medium: pass @dataclasses.dataclass class Photo(Medium): previewUrl: str fullUrl: str @dataclasses.dataclass class VideoVariant: contentType: str url: str bitrate: typing.Optional[int] @dataclasses.dataclass class Video(Medium): thumbnailUrl: str variants: typing.List[VideoVariant] duration: float @dataclasses.dataclass class Gif(Medium): thumbnailUrl: str variants: typing.List[VideoVariant] @dataclasses.dataclass class DescriptionURL: text: typing.Optional[str] url: str tcourl: str indices: typing.Tuple[int, int] @dataclasses.dataclass class Coordinates: longitude: float latitude: float @dataclasses.dataclass class Place: fullName: str name: str type: str country: str countryCode: str @dataclasses.dataclass class User(twitter_base.Entity): # Most fields can be None if theyre not known. username: str id: int displayname: typing.Optional[str] = None description: typing.Optional[str] = None # Description as its displayed on the web interface with URLs replaced rawDescription: typing.Optional[str] = None # Raw description with the URL(s) intact descriptionUrls: typing.Optional[typing.List[DescriptionURL]] = None verified: typing.Optional[bool] = None created: typing.Optional[datetime.datetime] = None followersCount: typing.Optional[int] = None friendsCount: typing.Optional[int] = None statusesCount: typing.Optional[int] = None favouritesCount: typing.Optional[int] = None listedCount: typing.Optional[int] = None mediaCount: typing.Optional[int] = None location: typing.Optional[str] = None protected: typing.Optional[bool] = None linkUrl: typing.Optional[str] = None linkTcourl: typing.Optional[str] = None profileImageUrl: typing.Optional[str] = None profileBannerUrl: typing.Optional[str] = None @property def url(self): return fhttps://twitter.com/{self.username} def __str__(self): return self.url class ScrollDirection(enum.Enum): TOP = enum.auto() BOTTOM = enum.auto() BOTH = enum.auto() class TwitterAPIScraper(twitter_base.Scraper): def __init__(self, baseUrl, **kwargs): super().__init__(**kwargs) self._baseUrl = baseUrl self._guestToken = None self._userAgent = fMozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)} self._apiHeaders = { User-Agent: self._userAgent, Authorization: _API_AUTHORIZATION_HEADER, Referer: self._baseUrl, Accept-Language: en-US,en;q=0.5, } def _ensure_guest_token(self, url = None): if self._guestToken is not None: return logger.info(Retrieving guest token) r = self._get(self._baseUrl if url is None else url, headers = {User-Agent: self._userAgent}) if (match := re.search(rdocument\.cookie = decodeURIComponent\("gt=(\d+); Max-Age=10800; Domain=\.twitter\.com; Path=/; Secure"\);, r.text)): logger.debug(Found guest token in HTML) self._guestToken = match.group(1) if gt in r.cookies: logger.debug(Found guest token in cookies) self._guestToken = r.cookies[gt] if self._guestToken: self._session.cookies.set(gt, self._guestToken, domain = .twitter.com, path = /, secure = True, expires = time.time() + 10800) self._apiHeaders[x-guest-token] = self._guestToken return raise twitter_base.ScraperException(Unable to find guest token) def _unset_guest_token(self): self._guestToken = None del self._session.cookies[gt] del self._apiHeaders[x-guest-token] def _check_api_response(self, r): if r.status_code == 429: self._unset_guest_token() self._ensure_guest_token() return False, rate-limited if r.headers.get(content-type, ).replace( , ) != application/json;charset=utf-8: return False, content type is not JSON if r.status_code != 200: return False, non-200 status code return True, None def _get_api_data(self, endpoint, params): self._ensure_guest_token() r = self._get(endpoint, params = params, headers = self._apiHeaders, responseOkCallback = self._check_api_response) try: obj = r.json() except json.JSONDecodeError as e: raise twitter_base.ScraperException(Received invalid JSON from Twitter) from e return obj def _iter_api_data(self, endpoint, params, paginationParams = None, cursor = None, direction = ScrollDirection.BOTTOM): # Iterate over endpoint with params/paginationParams, optionally starting from a cursor # Handles guest token extraction using the baseUrl passed to __init__ etc. # Order from params and paginationParams is preserved. To insert the cursor at a particular location, insert a cursor key into paginationParams there (value is overwritten). # direction controls in which direction it should scroll from the initial response. BOTH equals TOP followed by BOTTOM. # Logic for dual scrolling: direction is set to top, but if the bottom cursor is found, bottomCursorAndStop is set accordingly. # Once the top pagination is exhausted, the bottomCursorAndStop is used and reset to None; it isnt set anymore after because the first entry condition will always be true for the bottom cursor. if cursor is None: reqParams = params else: reqParams = paginationParams.copy() reqParams[cursor] = cursor bottomCursorAndStop = None if direction is ScrollDirection.TOP or direction is ScrollDirection.BOTH: dir = top else: dir = bottom stopOnEmptyResponse = False while True: logger.info(fRetrieving scroll page {cursor}) obj = self._get_api_data(endpoint, reqParams) yield obj # No data format test, just a hard and loud crash if anythings wrong :-) newCursor = None promptCursor = None newBottomCursorAndStop = None for instruction in obj[timeline][instructions]: if addEntries in instruction: entries = instruction[addEntries][entries] elif replaceEntry in instruction: entries = [instruction[replaceEntry][entry]] else: continue for entry in entries: if entry[entryId] == fsq-cursor-{dir} or entry[entryId].startswith(fcursor-{dir}-): newCursor = entry[content][operation][cursor][value] if stopOnEmptyResponse in entry[content][operation][cursor]: stopOnEmptyResponse = entry[content][operation][cursor][stopOnEmptyResponse] elif entry[entryId].startswith(cursor-showMoreThreadsPrompt-): # E.g. offensive replies button promptCursor = entry[content][operation][cursor][value] elif direction is ScrollDirection.BOTH and bottomCursorAndStop is None and (entry[entryId] == fsq-cursor-bottom or entry[entryId].startswith(cursor-bottom-)): newBottomCursorAndStop = (entry[content][operation][cursor][value], entry[content][operation][cursor].get(stopOnEmptyResponse, False)) if bottomCursorAndStop is None and newBottomCursorAndStop is not None: bottomCursorAndStop = newBottomCursorAndStop if not newCursor or newCursor == cursor or (stopOnEmptyResponse and self._count_tweets(obj) == 0): # End of pagination if promptCursor is not None: newCursor = promptCursor elif direction is ScrollDirection.BOTH and bottomCursorAndStop is not None: dir = bottom newCursor, stopOnEmptyResponse = bottomCursorAndStop bottomCursorAndStop = None else: break cursor = newCursor reqParams = paginationParams.copy() reqParams[cursor] = cursor def _count_tweets(self, obj): count = 0 for instruction in obj[timeline][instructions]: if addEntries in instruction: entries = instruction[addEntries][entries] elif replaceEntry in instruction: entries = [instruction[replaceEntry][entry]] else: continue for entry in entries: if entry[entryId].startswith(sq-I-t-) or entry[entryId].startswith(tweet-): count += 1 return count def _instructions_to_tweets(self, obj, includeConversationThreads = False): # No data format test, just a hard and loud crash if anythings wrong :-) for instruction in obj[timeline][instructions]: if addEntries in instruction: entries = instruction[addEntries][entries] elif replaceEntry in instruction: entries = [instruction[replaceEntry][entry]] else: continue for entry in entries: if entry[entryId].startswith(sq-I-t-) or entry[entryId].startswith(tweet-): yield from self._instruction_tweet_entry_to_tweet(entry[entryId], entry[content], obj) elif includeConversationThreads and entry[entryId].startswith(conversationThread-) and not entry[entryId].endswith(-show_more_cursor): for item in entry[content][timelineModule][items]: if item[entryId].startswith(tweet-): yield from self._instruction_tweet_entry_to_tweet(item[entryId], item, obj) def _instruction_tweet_entry_to_tweet(self, entryId, entry, obj): if tweet in entry[item][content]: if promotedMetadata in entry[item][content][tweet]: # Promoted tweet aka ads return if entry[item][content][tweet][id] not in obj[globalObjects][tweets]: logger.warning(fSkipping tweet {entry["item"]["content"]["tweet"]["id"]} which is not in globalObjects) return tweet = obj[globalObjects][tweets][entry[item][content][tweet][id]] elif tombstone in entry[item][content]: if tweet not in entry[item][content][tombstone]: # E.g. deleted reply return if entry[item][content][tombstone][tweet][id] not in obj[globalObjects][tweets]: logger.warning(fSkipping tweet {entry["item"]["content"]["tombstone"]["tweet"]["id"]} which is not in globalObjects) return tweet = obj[globalObjects][tweets][entry[item][content][tombstone][tweet][id]] else: raise twitter_base.ScraperException(fUnable to handle entry {entryId!r}) yield self._tweet_to_tweet(tweet, obj) def _tweet_to_tweet(self, tweet, obj): # Transforms a Twitter API tweet object into a Tweet kwargs = {} kwargs[id] = tweet[id] if id in tweet else int(tweet[id_str]) kwargs[content] = tweet[full_text] kwargs[renderedContent] = self._render_text_with_urls(tweet[full_text], tweet[entities].get(urls)) kwargs[user] = self._user_to_user(obj[globalObjects][users][tweet[user_id_str]]) kwargs[date] = email.utils.parsedate_to_datetime(tweet[created_at]) if tweet[entities].get(urls): kwargs[outlinks] = [u[expanded_url] for u in tweet[entities][urls]] kwargs[tcooutlinks] = [u[url] for u in tweet[entities][urls]] kwargs[url] = fhttps://twitter.com/{obj["globalObjects"]["users"][tweet["user_id_str"]]["screen_name"]}/status/{kwargs["id"]} kwargs[replyCount] = tweet[reply_count] kwargs[retweetCount] = tweet[retweet_count] kwargs[likeCount] = tweet[favorite_count] kwargs[quoteCount] = tweet[quote_count] kwargs[conversationId] = tweet[conversation_id] if conversation_id in tweet else int(tweet[conversation_id_str]) kwargs[lang] = tweet[lang] kwargs[source] = tweet[source] if (match := re.search(rhref=[\"]?([^\" >]+), tweet[source])): kwargs[sourceUrl] = match.group(1) if (match := re.search(r>([^<]*)<, tweet[source])): kwargs[sourceLabel] = match.group(1) if extended_entities in tweet and media in tweet[extended_entities]: media = [] for medium in tweet[extended_entities][media]: if medium[type] == photo: if . not in medium[media_url_https]: logger.warning(fSkipping malformed medium URL on tweet {kwargs["id"]}: {medium["media_url_https"]!r} contains no dot) continue baseUrl, format = medium[media_url_https].rsplit(., 1) if format not in (jpg, png): logger.warning(fSkipping photo with unknown format on tweet {kwargs["id"]}: {format!r}) continue media.append(Photo( previewUrl = f{baseUrl}?format={format}&name=small, fullUrl = f{baseUrl}?format={format}&name=large, )) elif medium[type] == video or medium[type] == animated_gif: variants = [] for variant in medium[video_info][variants]: variants.append(VideoVariant(contentType = variant[content_type], url = variant[url], bitrate = variant.get(bitrate))) mKwargs = { thumbnailUrl: medium[media_url_https], variants: variants, } if medium[type] == video: mKwargs[duration] = medium[video_info][duration_millis] / 1000 cls = Video elif medium[type] == animated_gif: cls = Gif media.append(cls(**mKwargs)) if media: kwargs[media] = media if retweeted_status_id_str in tweet: kwargs[retweetedTweet] = self._tweet_to_tweet(obj[globalObjects][tweets][tweet[retweeted_status_id_str]], obj) if quoted_status_id_str in tweet and tweet[quoted_status_id_str] in obj[globalObjects][tweets]: kwargs[quotedTweet] = self._tweet_to_tweet(obj[globalObjects][tweets][tweet[quoted_status_id_str]], obj) if (inReplyToTweetId := tweet.get(in_reply_to_status_id_str)): kwargs[inReplyToTweetId] = int(inReplyToTweetId) inReplyToUserId = int(tweet[in_reply_to_user_id_str]) if inReplyToUserId == kwargs[user].id: kwargs[inReplyToUser] = kwargs[user] elif tweet[entities].get(user_mentions): for u in tweet[entities][user_mentions]: if u[id_str] == tweet[in_reply_to_user_id_str]: kwargs[inReplyToUser] = User(username = u[screen_name], id = u[id] if id in u else int(u[id_str]), displayname = u[name]) if inReplyToUser not in kwargs: kwargs[inReplyToUser] = User(username = tweet[in_reply_to_screen_name], id = inReplyToUserId) if tweet[entities].get(user_mentions): kwargs[mentionedUsers] = [User(username = u[screen_name], id = u[id] if id in u else int(u[id_str]), displayname = u[name]) for u in tweet[entities][user_mentions]] # https://developer.twitter.com/en/docs/tutorials/filtering-tweets-by-location if tweet.get(coordinates): # coordinates root key (if present) presents coordinates in the form [LONGITUDE, LATITUDE] if (coords := tweet[coordinates][coordinates]) and len(coords) == 2: kwargs[coordinates] = Coordinates(coords[0], coords[1]) elif tweet.get(geo): # coordinates root key (if present) presents coordinates in the form [LATITUDE, LONGITUDE] if (coords := tweet[geo][coordinates]) and len(coords) == 2: kwargs[coordinates] = Coordinates(coords[1], coords[0]) if tweet.get(place): kwargs[place] = Place(tweet[place][full_name], tweet[place][name], tweet[place][place_type], tweet[place][country], tweet[place][country_code]) if coordinates not in kwargs and tweet[place][bounding_box] and (coords := tweet[place][bounding_box][coordinates]) and coords[0] and len(coords[0][0]) == 2: # Take the first (longitude, latitude) couple of the "place square" kwargs[coordinates] = Coordinates(coords[0][0][0], coords[0][0][1]) if tweet[entities].get(hashtags): kwargs[hashtags] = [o[text] for o in tweet[entities][hashtags]] if tweet[entities].get(symbols): kwargs[cashtags] = [o[text] for o in tweet[entities][symbols]] return Tweet(**kwargs) def _render_text_with_urls(self, text, urls): if not urls: return text out = [] out.append(text[:urls[0][indices][0]]) urlsSorted = sorted(urls, key = lambda x: x[indices][0]) # Ensure that theyre in left to right appearance order assert all(url[indices][1] <= nextUrl[indices][0] for url, nextUrl in zip(urls, urls[1:])), broken URL indices for url, nextUrl in itertools.zip_longest(urls, urls[1:]): if display_url in url: out.append(url[display_url]) out.append(text[url[indices][1] : nextUrl[indices][0] if nextUrl is not None else None]) return .join(out) def _user_to_user(self, user): kwargs = {} kwargs[username] = user[screen_name] kwargs[id] = user[id] if id in user else int(user[id_str]) kwargs[displayname] = user[name] kwargs[description] = self._render_text_with_urls(user[description], user[entities][description].get(urls)) kwargs[rawDescription] = user[description] if user[entities][description].get(urls): kwargs[descriptionUrls] = [{text: x.get(display_url), url: x[expanded_url], tcourl: x[url], indices: tuple(x[indices])} for x in user[entities][description][urls]] kwargs[verified] = user.get(verified) kwargs[created] = email.utils.parsedate_to_datetime(user[created_at]) kwargs[followersCount] = user[followers_count] kwargs[friendsCount] = user[friends_count] kwargs[statusesCount] = user[statuses_count] kwargs[favouritesCount] = user[favourites_count] kwargs[listedCount] = user[listed_count] kwargs[mediaCount] = user[media_count] kwargs[location] = user[location] kwargs[protected] = user.get(protected) if url in user[entities]: kwargs[linkUrl] = (user[entities][url][urls][0].get(expanded_url) or user.get(url)) kwargs[linkTcourl] = user.get(url) kwargs[profileImageUrl] = user[profile_image_url_https] kwargs[profileBannerUrl] = user.get(profile_banner_url) return User(**kwargs) class TwitterSearchScraper(TwitterAPIScraper): name = twitter-search def __init__(self, query, cursor = None, top = False, **kwargs): super().__init__(baseUrl = https://twitter.com/search? + urllib.parse.urlencode({f: live, lang: en, q: query, src: spelling_expansion_revert_click}), **kwargs) self._query = query # Note: may get replaced by subclasses when using user ID resolution self._cursor = cursor self._top = top def _check_scroll_response(self, r): if r.status_code == 429: # Accept a 429 response as "valid" to prevent retries; handled explicitly in get_items return True, None if r.headers.get(content-type).replace( , ) != application/json;charset=utf-8: return False, fcontent type is not JSON if r.status_code != 200: return False, fnon-200 status code return True, None def get_items(self): paginationParams = { include_profile_interstitial_type: 1, include_blocking: 1, include_blocked_by: 1, include_followed_by: 1, include_want_retweets: 1, include_mute_edge: 1, include_can_dm: 1, include_can_media_tag: 1, skip_status: 1, cards_platform: Web-12, include_cards: 1, include_ext_alt_text: true, include_quote_count: true, include_reply_count: 1, tweet_mode: extended, include_entities: true, include_user_entities: true, include_ext_media_color: true, include_ext_media_availability: true, send_error_codes: true, simple_quoted_tweets: true, q: self._query, tweet_search_mode: live, count: 100, query_source: spelling_expansion_revert_click, cursor: None, pc: 1, spelling_corrections: 1, ext: ext=mediaStats%2ChighlightedLabel, } params = paginationParams.copy() del params[cursor] if self._top: del params[tweet_search_mode] del paginationParams[tweet_search_mode] for obj in self._iter_api_data(https://api.twitter.com/2/search/adaptive.json, params, paginationParams): yield from self._instructions_to_tweets(obj) @classmethod def setup_parser(cls, subparser): subparser.add_argument(--cursor, metavar = CURSOR) subparser.add_argument(--top, action = store_true, default = False, help = Enable fetching top tweets instead of live/chronological) subparser.add_argument(query, help = A Twitter search string) @classmethod def from_args(cls, args): return cls(args.query, cursor = args.cursor, top = args.top, retries = args.retries) class TwitterUserScraper(TwitterSearchScraper): name = twitter-user def __init__(self, username, isUserId, **kwargs): if not self.is_valid_username(username): raise ValueError(Invalid username) super().__init__(ffrom:{username}, **kwargs) self._username = username self._isUserId = isUserId self._baseUrl = fhttps://twitter.com/{self._username} if not self._isUserId else fhttps://twitter.com/i/user/{self._username} def _get_entity(self): self._ensure_guest_token() if not self._isUserId: fieldName = screen_name endpoint = https://api.twitter.com/graphql/-xfUfZsnR_zqjFd-IfrN5A/UserByScreenName else: fieldName = userId endpoint = https://twitter.com/i/api/graphql/WN6Hck-Pwm-YP0uxVj1oMQ/UserByRestIdWithoutResults params = {variables: json.dumps({fieldName: self._username, withHighlightedLabel: True}, separators = (,, :))} obj = self._get_api_data(endpoint, params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote)) if not obj[data]: return None user = obj[data][user] rawDescription = user[legacy][description] description = self._render_text_with_urls(rawDescription, user[legacy][entities][description][urls]) return User( username = user[legacy][screen_name], id = user[rest_id], displayname = user[legacy][name], description = description, rawDescription = rawDescription, descriptionUrls = [{text: x.get(display_url), url: x[expanded_url], tcourl: x[url], indices: tuple(x[indices])} for x in user[legacy][entities][description][urls]], verified = user[legacy][verified], created = email.utils.parsedate_to_datetime(user[legacy][created_at]), followersCount = user[legacy][followers_count], friendsCount = user[legacy][friends_count], statusesCount = user[legacy][statuses_count], favouritesCount = user[legacy][favourites_count], listedCount = user[legacy][listed_count], mediaCount = user[legacy][media_count], location = user[legacy][location], protected = user[legacy][protected], linkUrl = user[legacy][entities][url][urls][0][expanded_url] if url in user[legacy][entities] else None, linkTcourl = user[legacy].get(url), profileImageUrl = user[legacy][profile_image_url_https], profileBannerUrl = user[legacy].get(profile_banner_url), ) def get_items(self): if self._isUserId: # Resolve user ID to username self._username = self.entity.username self._isUserId = False self._query = ffrom:{self._username} yield from super().get_items() @staticmethod def is_valid_username(s): return (1 <= len(s) <= 15 and s.strip(string.ascii_letters + string.digits + _) == ) or (s and s.strip(string.digits) == ) @classmethod def setup_parser(cls, subparser): def username(s): if cls.is_valid_username(s): return s raise ValueError(Invalid username) subparser.add_argument(--user-id, dest = isUserId, action = store_true, default = False, help = Use user ID instead of username) subparser.add_argument(username, type = username, help = A Twitter username (without @)) @classmethod def from_args(cls, args): return cls(args.username, args.isUserId, retries = args.retries) class TwitterProfileScraper(TwitterUserScraper): name = twitter-profile def get_items(self): if not self._isUserId: userId = self.entity.id else: userId = self._username paginationParams = { include_profile_interstitial_type: 1, include_blocking: 1, include_blocked_by: 1, include_followed_by: 1, include_want_retweets: 1, include_mute_edge: 1, include_can_dm: 1, include_can_media_tag: 1, skip_status: 1, cards_platform: Web-12, include_cards: 1, include_ext_alt_text: true, include_quote_count: true, include_reply_count: 1, tweet_mode: extended, include_entities: true, include_user_entities: true, include_ext_media_color: true, include_ext_media_availability: true, send_error_codes: true, simple_quoted_tweets: true, include_tweet_replies: true, userId: userId, count: 100, cursor: None, ext: ext=mediaStats%2ChighlightedLabel, } params = paginationParams.copy() del params[cursor] for obj in self._iter_api_data(fhttps://api.twitter.com/2/timeline/profile/{userId}.json, params, paginationParams): yield from self._instructions_to_tweets(obj) class TwitterHashtagScraper(TwitterSearchScraper): name = twitter-hashtag def __init__(self, hashtag, **kwargs): super().__init__(f#{hashtag}, **kwargs) self._hashtag = hashtag @classmethod def setup_parser(cls, subparser): subparser.add_argument(hashtag, help = A Twitter hashtag (without #)) @classmethod def from_args(cls, args): return cls(args.hashtag, retries = args.retries) class TwitterTweetScraperMode(enum.Enum): SINGLE = single SCROLL = scroll RECURSE = recurse @classmethod def from_args(cls, args): if args.scroll: return cls.SCROLL if args.recurse: return cls.RECURSE return cls.SINGLE class TwitterTweetScraper(TwitterAPIScraper): name = twitter-tweet def __init__(self, tweetId, mode, **kwargs): self._tweetId = tweetId self._mode = mode super().__init__(fhttps://twitter.com/i/web/{self._tweetId}, **kwargs) def get_items(self): paginationParams = { include_profile_interstitial_type: 1, include_blocking: 1, include_blocked_by: 1, include_followed_by: 1, include_want_retweets: 1, include_mute_edge: 1, include_can_dm: 1, include_can_media_tag: 1, skip_status: 1, cards_platform: Web-12, include_cards: 1, include_ext_alt_text: true, include_quote_count: true, include_reply_count: 1, tweet_mode: extended, include_entities: true, include_user_entities: true, include_ext_media_color: true, include_ext_media_availability: true, send_error_codes: true, simple_quoted_tweet: true, count: 20, cursor: None, include_ext_has_birdwatch_notes: false, ext: mediaStats%2ChighlightedLabel, } params = paginationParams.copy() del params[cursor] if self._mode is TwitterTweetScraperMode.SINGLE: obj = self._get_api_data(fhttps://twitter.com/i/api/2/timeline/conversation/{self._tweetId}.json, params) yield self._tweet_to_tweet(obj[globalObjects][tweets][str(self._tweetId)], obj) elif self._mode is TwitterTweetScraperMode.SCROLL: for obj in self._iter_api_data(fhttps://twitter.com/i/api/2/timeline/conversation/{self._tweetId}.json, params, paginationParams, direction = ScrollDirection.BOTH): yield from self._instructions_to_tweets(obj, includeConversationThreads = True) elif self._mode is TwitterTweetScraperMode.RECURSE: seenTweets = set() queue = collections.deque() queue.append(self._tweetId) while queue: tweetId = queue.popleft() for obj in self._iter_api_data(fhttps://twitter.com/i/api/2/timeline/conversation/{tweetId}.json, params, paginationParams, direction = ScrollDirection.BOTH): for tweet in self._instructions_to_tweets(obj, includeConversationThreads = True): if tweet.id not in seenTweets: yield tweet seenTweets.add(tweet.id) if tweet.replyCount: queue.append(tweet.id) @classmethod def setup_parser(cls, subparser): group = subparser.add_mutually_exclusive_group(required = False) group.add_argument(--scroll, action = store_true, default = False, help = Enable scrolling in both directions) group.add_argument(--recurse, --recursive, action = store_true, default = False, help = Enable recursion through all tweets encountered (warning: slow, potentially memory-intensive!)) subparser.add_argument(tweetId, type = int, help = A tweet ID) @classmethod def from_args(cls, args): return cls(args.tweetId, TwitterTweetScraperMode.from_args(args), retries = args.retries) class TwitterListPostsScraper(TwitterSearchScraper): name = twitter-list-posts def __init__(self, listName, **kwargs): super().__init__(flist:{listName}, **kwargs) self._listName = listName @classmethod def setup_parser(cls, subparser): subparser.add_argument(list, help = A Twitter list ID or a string of the form "username/listname" (replace spaces with dashes)) @classmethod def from_args(cls, args): return cls(args.list, retries = args.retries)间接在文件夹下scrape引入base文件即可
需要用到API ,举列neuro_scand那个账号,途径也需要本身修改
# 爬取粉丝的存眷者,每次都是20笔记录 # 需要修改的数据位第一个的user_name的名字,其他的都没有影响 import tweepy import csv ids=[] def get_list(user_name): user_id=user_name consumer_key = e1232137p consumer_secret = bUK4lzM123cWT2LiVUQHuCRDqMx5 access_token = 3388759955-3yNxjWBNns23QfQyOZ9uvZvsN9brZg access_token_secret = R8TGCOVGhNn123LmiYTg091Pd10vhpks auth = tweepy.OAuthHandler (consumer_key, consumer_secret) auth.set_access_token (access_token, access_token_secret) api = tweepy.API(auth,wait_on_rate_limit=True) print(api) for page in tweepy.Cursor(api.followers, screen_name=user_id).pages(): print("起头计数") ids.extend(page) print(len(ids)) return ids get_list(neuro_scand) print(len(ids)) def read_list(name): f = open(/root/scrapy_medical/neuro36/+name+.csv, a+, encoding=utf-8) csv_writer = csv.writer(f) csv_writer.writerow(["姓名", "地点", "描述"]) for i in range(len(ids)): list_xr=[] print(Name - + ids[i].name) print(Bio - + ids[i].description) print(Location - + ids[i].location) list_xr.append(ids[i].name) list_xr.append(ids[i].location) list_xr.append(ids[i].description) csv_writer.writerow(list_xr) list_xr.clear() read_list("neuro_scand")若何通过天文位置爬虫信息
链接: https://pan.baidu.com/s/1MlFF7zbTr_nt8xFJPpkzDg
提取码: wv71 复造那段内容后翻开百度网盘手机App,操做更便利哦