twitter(推特)爬虫全方位利用

1个月前 (01-07 01:47)阅读1回复0
zaibaike
zaibaike
  • 管理员
  • 注册排名1
  • 经验值136200
  • 级别管理员
  • 主题27240
  • 回复0
楼主

本文内容会比力长,包罗但不限于以下:

1 某个用户的所有推特内容2 包罗某个关键词的推特3 用户的粉丝,存眷者,以及下一步粉丝存眷者信息4 按照热度,转发量爬虫远古数据5 按照坐标,天文位置,金纬度,翻开等爬虫推特6 爬虫推特的评论,留言,以及留言者的信息7 用bot机器人评论他人的推特等

......

话不多说,间接贴代码吧,分享百度网盘

1.若何爬取某小我所发的所有推特

base文件

import abc import copy import dataclasses import datetime import functools import json import logging import requests import time import warnings logger = logging.getLogger(__name__) class _DeprecatedProperty: def __init__(self, name, repl, replStr): self.name = name self.repl = repl self.replStr = replStr def __get__(self, obj, objType): if obj is None: # if the access is through the class using _DeprecatedProperty rather than an instance of the class: return self warnings.warn(f{self.name} is deprecated, use {self.replStr} instead, FutureWarning, stacklevel = 2) return self.repl(obj) def _json_serialise_datetime(obj): A JSON serialiser that converts datetime.datetime and datetime.date objects to ISO-8601 strings. if isinstance(obj, (datetime.datetime, datetime.date)): return obj.isoformat() raise TypeError(fObject of type {type(obj)} is not JSON serializable) def _json_dataclass_to_dict(obj): if isinstance(obj, _JSONDataclass) or dataclasses.is_dataclass(obj): out = {} out[_type] = f{type(obj).__module__}.{type(obj).__name__} for field in dataclasses.fields(obj): assert field.name != _type out[field.name] = _json_dataclass_to_dict(getattr(obj, field.name)) # Add in (non-deprecated) properties for k in dir(obj): if isinstance(getattr(type(obj), k, None), property): assert k != _type out[k] = _json_dataclass_to_dict(getattr(obj, k)) return out elif isinstance(obj, (tuple, list)): return type(obj)(_json_dataclass_to_dict(x) for x in obj) elif isinstance(obj, dict): return {_json_dataclass_to_dict(k): _json_dataclass_to_dict(v) for k, v in obj.items()} elif isinstance(obj, set): return {_json_dataclass_to_dict(v) for v in obj} else: return copy.deepcopy(obj) @dataclasses.dataclass class _JSONDataclass: A base class for dataclasses for conversion to JSON def json(self): Convert the object to a JSON string out = _json_dataclass_to_dict(self) for key, value in list(out.items()): # Modifying the dict below, so make a copy first if isinstance(value, IntWithGranularity): out[key] = int(value) assert f{key}.granularity not in out, fGranularity collision on {key}.granularity out[f{key}.granularity] = value.granularity return json.dumps(out, default = _json_serialise_datetime) @dataclasses.dataclass class Item(_JSONDataclass): An abstract base class for an item returned by the scrapers get_items generator. An item can really be anything. The string representation should be useful for the CLI output (e.g. a direct URL for the item). @abc.abstractmethod def __str__(self): pass @dataclasses.dataclass class Entity(_JSONDataclass): An abstract base class for an entity returned by the scrapers entity property. An entity is typically the account of a person or organisation. The string representation should be the preferred direct URL to the entitys page on the network. @abc.abstractmethod def __str__(self): pass class IntWithGranularity(int): A number with an associated granularity For example, an IntWithGranularity(42000, 1000) represents a number on the order of 42000 with two significant digits, i.e. something counted with a granularity of 1000. def __new__(cls, value, granularity, *args, **kwargs): obj = super().__new__(cls, value, *args, **kwargs) obj.granularity = granularity return obj def __reduce__(self): return (IntWithGranularity, (int(self), self.granularity)) class URLItem(Item): A generic item which only holds a URL string. def __init__(self, url): self._url = url @property def url(self): return self._url def __str__(self): return self._url class ScraperException(Exception): pass class Scraper: An abstract base class for a scraper. name = None def __init__(self, retries = 3): self._retries = retries self._session = requests.Session() @abc.abstractmethod def get_items(self): Iterator yielding Items. pass def _get_entity(self): Get the entity behind the scraper, if any. This is the method implemented by subclasses for doing the actual retrieval/entity object creation. For accessing the scrapers entity, use the entity property. return None @functools.cached_property def entity(self): return self._get_entity() def _request(self, method, url, params = None, data = None, headers = None, timeout = 10, responseOkCallback = None, allowRedirects = True): for attempt in range(self._retries + 1): # The request is newly prepared on each retry because of potential cookie updates. req = self._session.prepare_request(requests.Request(method, url, params = params, data = data, headers = headers)) logger.info(fRetrieving {req.url}) logger.debug(f... with headers: {headers!r}) if data: logger.debug(f... with data: {data!r}) try: r = self._session.send(req, allow_redirects = allowRedirects, timeout = timeout) except requests.exceptions.RequestException as exc: if attempt < self._retries: retrying = , retrying level = logging.INFO else: retrying = level = logging.ERROR logger.log(level, fError retrieving {req.url}: {exc!r}{retrying}) else: if responseOkCallback is not None: success, msg = responseOkCallback(r) else: success, msg = (True, None) msg = f: {msg} if msg else if success: logger.debug(f{req.url} retrieved successfully{msg}) return r else: if attempt < self._retries: retrying = , retrying level = logging.INFO else: retrying = level = logging.ERROR logger.log(level, fError retrieving {req.url}{msg}{retrying}) if attempt < self._retries: sleepTime = 1.0 * 2**attempt # exponential backoff: sleep 1 second after first attempt, 2 after second, 4 after third, etc. logger.info(fWaiting {sleepTime:.0f} seconds) time.sleep(sleepTime) else: msg = f{self._retries + 1} requests to {req.url} failed, giving up. logger.fatal(msg) raise ScraperException(msg) raise RuntimeError(Reached unreachable code) def _get(self, *args, **kwargs): return self._request(GET, *args, **kwargs) def _post(self, *args, **kwargs): return self._request(POST, *args, **kwargs) @classmethod @abc.abstractmethod def setup_parser(cls, subparser): pass @classmethod @abc.abstractmethod def from_args(cls, args): pass

脚本文件:

import bs4 import collections import dataclasses import datetime import email.utils import enum import itertools import json import random import logging import re import twitter_base import string import time import typing import urllib.parse logger = logging.getLogger(__name__) _API_AUTHORIZATION_HEADER = Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA @dataclasses.dataclass class Tweet(twitter_base.Item): url: str date: datetime.datetime content: str renderedContent: str id: int user: User replyCount: int retweetCount: int likeCount: int quoteCount: int conversationId: int lang: str source: str sourceUrl: typing.Optional[str] = None sourceLabel: typing.Optional[str] = None outlinks: typing.Optional[typing.List[str]] = None tcooutlinks: typing.Optional[typing.List[str]] = None media: typing.Optional[typing.List[Medium]] = None retweetedTweet: typing.Optional[Tweet] = None quotedTweet: typing.Optional[Tweet] = None inReplyToTweetId: typing.Optional[int] = None inReplyToUser: typing.Optional[User] = None mentionedUsers: typing.Optional[typing.List[User]] = None coordinates: typing.Optional[Coordinates] = None place: typing.Optional[Place] = None hashtags: typing.Optional[typing.List[str]] = None cashtags: typing.Optional[typing.List[str]] = None username = twitter_base._DeprecatedProperty(username, lambda self: self.user.username, user.username) outlinksss = twitter_base._DeprecatedProperty(outlinksss, lambda self: .join(self.outlinks) if self.outlinks else , outlinks) tcooutlinksss = twitter_base._DeprecatedProperty(tcooutlinksss, lambda self: .join(self.tcooutlinks) if self.tcooutlinks else , tcooutlinks) def __str__(self): return self.url class Medium: pass @dataclasses.dataclass class Photo(Medium): previewUrl: str fullUrl: str @dataclasses.dataclass class VideoVariant: contentType: str url: str bitrate: typing.Optional[int] @dataclasses.dataclass class Video(Medium): thumbnailUrl: str variants: typing.List[VideoVariant] duration: float @dataclasses.dataclass class Gif(Medium): thumbnailUrl: str variants: typing.List[VideoVariant] @dataclasses.dataclass class DescriptionURL: text: typing.Optional[str] url: str tcourl: str indices: typing.Tuple[int, int] @dataclasses.dataclass class Coordinates: longitude: float latitude: float @dataclasses.dataclass class Place: fullName: str name: str type: str country: str countryCode: str @dataclasses.dataclass class User(twitter_base.Entity): # Most fields can be None if theyre not known. username: str id: int displayname: typing.Optional[str] = None description: typing.Optional[str] = None # Description as its displayed on the web interface with URLs replaced rawDescription: typing.Optional[str] = None # Raw description with the URL(s) intact descriptionUrls: typing.Optional[typing.List[DescriptionURL]] = None verified: typing.Optional[bool] = None created: typing.Optional[datetime.datetime] = None followersCount: typing.Optional[int] = None friendsCount: typing.Optional[int] = None statusesCount: typing.Optional[int] = None favouritesCount: typing.Optional[int] = None listedCount: typing.Optional[int] = None mediaCount: typing.Optional[int] = None location: typing.Optional[str] = None protected: typing.Optional[bool] = None linkUrl: typing.Optional[str] = None linkTcourl: typing.Optional[str] = None profileImageUrl: typing.Optional[str] = None profileBannerUrl: typing.Optional[str] = None @property def url(self): return fhttps://twitter.com/{self.username} def __str__(self): return self.url class ScrollDirection(enum.Enum): TOP = enum.auto() BOTTOM = enum.auto() BOTH = enum.auto() class TwitterAPIScraper(twitter_base.Scraper): def __init__(self, baseUrl, **kwargs): super().__init__(**kwargs) self._baseUrl = baseUrl self._guestToken = None self._userAgent = fMozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)} self._apiHeaders = { User-Agent: self._userAgent, Authorization: _API_AUTHORIZATION_HEADER, Referer: self._baseUrl, Accept-Language: en-US,en;q=0.5, } def _ensure_guest_token(self, url = None): if self._guestToken is not None: return logger.info(Retrieving guest token) r = self._get(self._baseUrl if url is None else url, headers = {User-Agent: self._userAgent}) if (match := re.search(rdocument\.cookie = decodeURIComponent\("gt=(\d+); Max-Age=10800; Domain=\.twitter\.com; Path=/; Secure"\);, r.text)): logger.debug(Found guest token in HTML) self._guestToken = match.group(1) if gt in r.cookies: logger.debug(Found guest token in cookies) self._guestToken = r.cookies[gt] if self._guestToken: self._session.cookies.set(gt, self._guestToken, domain = .twitter.com, path = /, secure = True, expires = time.time() + 10800) self._apiHeaders[x-guest-token] = self._guestToken return raise twitter_base.ScraperException(Unable to find guest token) def _unset_guest_token(self): self._guestToken = None del self._session.cookies[gt] del self._apiHeaders[x-guest-token] def _check_api_response(self, r): if r.status_code == 429: self._unset_guest_token() self._ensure_guest_token() return False, rate-limited if r.headers.get(content-type, ).replace( , ) != application/json;charset=utf-8: return False, content type is not JSON if r.status_code != 200: return False, non-200 status code return True, None def _get_api_data(self, endpoint, params): self._ensure_guest_token() r = self._get(endpoint, params = params, headers = self._apiHeaders, responseOkCallback = self._check_api_response) try: obj = r.json() except json.JSONDecodeError as e: raise twitter_base.ScraperException(Received invalid JSON from Twitter) from e return obj def _iter_api_data(self, endpoint, params, paginationParams = None, cursor = None, direction = ScrollDirection.BOTTOM): # Iterate over endpoint with params/paginationParams, optionally starting from a cursor # Handles guest token extraction using the baseUrl passed to __init__ etc. # Order from params and paginationParams is preserved. To insert the cursor at a particular location, insert a cursor key into paginationParams there (value is overwritten). # direction controls in which direction it should scroll from the initial response. BOTH equals TOP followed by BOTTOM. # Logic for dual scrolling: direction is set to top, but if the bottom cursor is found, bottomCursorAndStop is set accordingly. # Once the top pagination is exhausted, the bottomCursorAndStop is used and reset to None; it isnt set anymore after because the first entry condition will always be true for the bottom cursor. if cursor is None: reqParams = params else: reqParams = paginationParams.copy() reqParams[cursor] = cursor bottomCursorAndStop = None if direction is ScrollDirection.TOP or direction is ScrollDirection.BOTH: dir = top else: dir = bottom stopOnEmptyResponse = False while True: logger.info(fRetrieving scroll page {cursor}) obj = self._get_api_data(endpoint, reqParams) yield obj # No data format test, just a hard and loud crash if anythings wrong :-) newCursor = None promptCursor = None newBottomCursorAndStop = None for instruction in obj[timeline][instructions]: if addEntries in instruction: entries = instruction[addEntries][entries] elif replaceEntry in instruction: entries = [instruction[replaceEntry][entry]] else: continue for entry in entries: if entry[entryId] == fsq-cursor-{dir} or entry[entryId].startswith(fcursor-{dir}-): newCursor = entry[content][operation][cursor][value] if stopOnEmptyResponse in entry[content][operation][cursor]: stopOnEmptyResponse = entry[content][operation][cursor][stopOnEmptyResponse] elif entry[entryId].startswith(cursor-showMoreThreadsPrompt-): # E.g. offensive replies button promptCursor = entry[content][operation][cursor][value] elif direction is ScrollDirection.BOTH and bottomCursorAndStop is None and (entry[entryId] == fsq-cursor-bottom or entry[entryId].startswith(cursor-bottom-)): newBottomCursorAndStop = (entry[content][operation][cursor][value], entry[content][operation][cursor].get(stopOnEmptyResponse, False)) if bottomCursorAndStop is None and newBottomCursorAndStop is not None: bottomCursorAndStop = newBottomCursorAndStop if not newCursor or newCursor == cursor or (stopOnEmptyResponse and self._count_tweets(obj) == 0): # End of pagination if promptCursor is not None: newCursor = promptCursor elif direction is ScrollDirection.BOTH and bottomCursorAndStop is not None: dir = bottom newCursor, stopOnEmptyResponse = bottomCursorAndStop bottomCursorAndStop = None else: break cursor = newCursor reqParams = paginationParams.copy() reqParams[cursor] = cursor def _count_tweets(self, obj): count = 0 for instruction in obj[timeline][instructions]: if addEntries in instruction: entries = instruction[addEntries][entries] elif replaceEntry in instruction: entries = [instruction[replaceEntry][entry]] else: continue for entry in entries: if entry[entryId].startswith(sq-I-t-) or entry[entryId].startswith(tweet-): count += 1 return count def _instructions_to_tweets(self, obj, includeConversationThreads = False): # No data format test, just a hard and loud crash if anythings wrong :-) for instruction in obj[timeline][instructions]: if addEntries in instruction: entries = instruction[addEntries][entries] elif replaceEntry in instruction: entries = [instruction[replaceEntry][entry]] else: continue for entry in entries: if entry[entryId].startswith(sq-I-t-) or entry[entryId].startswith(tweet-): yield from self._instruction_tweet_entry_to_tweet(entry[entryId], entry[content], obj) elif includeConversationThreads and entry[entryId].startswith(conversationThread-) and not entry[entryId].endswith(-show_more_cursor): for item in entry[content][timelineModule][items]: if item[entryId].startswith(tweet-): yield from self._instruction_tweet_entry_to_tweet(item[entryId], item, obj) def _instruction_tweet_entry_to_tweet(self, entryId, entry, obj): if tweet in entry[item][content]: if promotedMetadata in entry[item][content][tweet]: # Promoted tweet aka ads return if entry[item][content][tweet][id] not in obj[globalObjects][tweets]: logger.warning(fSkipping tweet {entry["item"]["content"]["tweet"]["id"]} which is not in globalObjects) return tweet = obj[globalObjects][tweets][entry[item][content][tweet][id]] elif tombstone in entry[item][content]: if tweet not in entry[item][content][tombstone]: # E.g. deleted reply return if entry[item][content][tombstone][tweet][id] not in obj[globalObjects][tweets]: logger.warning(fSkipping tweet {entry["item"]["content"]["tombstone"]["tweet"]["id"]} which is not in globalObjects) return tweet = obj[globalObjects][tweets][entry[item][content][tombstone][tweet][id]] else: raise twitter_base.ScraperException(fUnable to handle entry {entryId!r}) yield self._tweet_to_tweet(tweet, obj) def _tweet_to_tweet(self, tweet, obj): # Transforms a Twitter API tweet object into a Tweet kwargs = {} kwargs[id] = tweet[id] if id in tweet else int(tweet[id_str]) kwargs[content] = tweet[full_text] kwargs[renderedContent] = self._render_text_with_urls(tweet[full_text], tweet[entities].get(urls)) kwargs[user] = self._user_to_user(obj[globalObjects][users][tweet[user_id_str]]) kwargs[date] = email.utils.parsedate_to_datetime(tweet[created_at]) if tweet[entities].get(urls): kwargs[outlinks] = [u[expanded_url] for u in tweet[entities][urls]] kwargs[tcooutlinks] = [u[url] for u in tweet[entities][urls]] kwargs[url] = fhttps://twitter.com/{obj["globalObjects"]["users"][tweet["user_id_str"]]["screen_name"]}/status/{kwargs["id"]} kwargs[replyCount] = tweet[reply_count] kwargs[retweetCount] = tweet[retweet_count] kwargs[likeCount] = tweet[favorite_count] kwargs[quoteCount] = tweet[quote_count] kwargs[conversationId] = tweet[conversation_id] if conversation_id in tweet else int(tweet[conversation_id_str]) kwargs[lang] = tweet[lang] kwargs[source] = tweet[source] if (match := re.search(rhref=[\"]?([^\" >]+), tweet[source])): kwargs[sourceUrl] = match.group(1) if (match := re.search(r>([^<]*)<, tweet[source])): kwargs[sourceLabel] = match.group(1) if extended_entities in tweet and media in tweet[extended_entities]: media = [] for medium in tweet[extended_entities][media]: if medium[type] == photo: if . not in medium[media_url_https]: logger.warning(fSkipping malformed medium URL on tweet {kwargs["id"]}: {medium["media_url_https"]!r} contains no dot) continue baseUrl, format = medium[media_url_https].rsplit(., 1) if format not in (jpg, png): logger.warning(fSkipping photo with unknown format on tweet {kwargs["id"]}: {format!r}) continue media.append(Photo( previewUrl = f{baseUrl}?format={format}&name=small, fullUrl = f{baseUrl}?format={format}&name=large, )) elif medium[type] == video or medium[type] == animated_gif: variants = [] for variant in medium[video_info][variants]: variants.append(VideoVariant(contentType = variant[content_type], url = variant[url], bitrate = variant.get(bitrate))) mKwargs = { thumbnailUrl: medium[media_url_https], variants: variants, } if medium[type] == video: mKwargs[duration] = medium[video_info][duration_millis] / 1000 cls = Video elif medium[type] == animated_gif: cls = Gif media.append(cls(**mKwargs)) if media: kwargs[media] = media if retweeted_status_id_str in tweet: kwargs[retweetedTweet] = self._tweet_to_tweet(obj[globalObjects][tweets][tweet[retweeted_status_id_str]], obj) if quoted_status_id_str in tweet and tweet[quoted_status_id_str] in obj[globalObjects][tweets]: kwargs[quotedTweet] = self._tweet_to_tweet(obj[globalObjects][tweets][tweet[quoted_status_id_str]], obj) if (inReplyToTweetId := tweet.get(in_reply_to_status_id_str)): kwargs[inReplyToTweetId] = int(inReplyToTweetId) inReplyToUserId = int(tweet[in_reply_to_user_id_str]) if inReplyToUserId == kwargs[user].id: kwargs[inReplyToUser] = kwargs[user] elif tweet[entities].get(user_mentions): for u in tweet[entities][user_mentions]: if u[id_str] == tweet[in_reply_to_user_id_str]: kwargs[inReplyToUser] = User(username = u[screen_name], id = u[id] if id in u else int(u[id_str]), displayname = u[name]) if inReplyToUser not in kwargs: kwargs[inReplyToUser] = User(username = tweet[in_reply_to_screen_name], id = inReplyToUserId) if tweet[entities].get(user_mentions): kwargs[mentionedUsers] = [User(username = u[screen_name], id = u[id] if id in u else int(u[id_str]), displayname = u[name]) for u in tweet[entities][user_mentions]] # https://developer.twitter.com/en/docs/tutorials/filtering-tweets-by-location if tweet.get(coordinates): # coordinates root key (if present) presents coordinates in the form [LONGITUDE, LATITUDE] if (coords := tweet[coordinates][coordinates]) and len(coords) == 2: kwargs[coordinates] = Coordinates(coords[0], coords[1]) elif tweet.get(geo): # coordinates root key (if present) presents coordinates in the form [LATITUDE, LONGITUDE] if (coords := tweet[geo][coordinates]) and len(coords) == 2: kwargs[coordinates] = Coordinates(coords[1], coords[0]) if tweet.get(place): kwargs[place] = Place(tweet[place][full_name], tweet[place][name], tweet[place][place_type], tweet[place][country], tweet[place][country_code]) if coordinates not in kwargs and tweet[place][bounding_box] and (coords := tweet[place][bounding_box][coordinates]) and coords[0] and len(coords[0][0]) == 2: # Take the first (longitude, latitude) couple of the "place square" kwargs[coordinates] = Coordinates(coords[0][0][0], coords[0][0][1]) if tweet[entities].get(hashtags): kwargs[hashtags] = [o[text] for o in tweet[entities][hashtags]] if tweet[entities].get(symbols): kwargs[cashtags] = [o[text] for o in tweet[entities][symbols]] return Tweet(**kwargs) def _render_text_with_urls(self, text, urls): if not urls: return text out = [] out.append(text[:urls[0][indices][0]]) urlsSorted = sorted(urls, key = lambda x: x[indices][0]) # Ensure that theyre in left to right appearance order assert all(url[indices][1] <= nextUrl[indices][0] for url, nextUrl in zip(urls, urls[1:])), broken URL indices for url, nextUrl in itertools.zip_longest(urls, urls[1:]): if display_url in url: out.append(url[display_url]) out.append(text[url[indices][1] : nextUrl[indices][0] if nextUrl is not None else None]) return .join(out) def _user_to_user(self, user): kwargs = {} kwargs[username] = user[screen_name] kwargs[id] = user[id] if id in user else int(user[id_str]) kwargs[displayname] = user[name] kwargs[description] = self._render_text_with_urls(user[description], user[entities][description].get(urls)) kwargs[rawDescription] = user[description] if user[entities][description].get(urls): kwargs[descriptionUrls] = [{text: x.get(display_url), url: x[expanded_url], tcourl: x[url], indices: tuple(x[indices])} for x in user[entities][description][urls]] kwargs[verified] = user.get(verified) kwargs[created] = email.utils.parsedate_to_datetime(user[created_at]) kwargs[followersCount] = user[followers_count] kwargs[friendsCount] = user[friends_count] kwargs[statusesCount] = user[statuses_count] kwargs[favouritesCount] = user[favourites_count] kwargs[listedCount] = user[listed_count] kwargs[mediaCount] = user[media_count] kwargs[location] = user[location] kwargs[protected] = user.get(protected) if url in user[entities]: kwargs[linkUrl] = (user[entities][url][urls][0].get(expanded_url) or user.get(url)) kwargs[linkTcourl] = user.get(url) kwargs[profileImageUrl] = user[profile_image_url_https] kwargs[profileBannerUrl] = user.get(profile_banner_url) return User(**kwargs) class TwitterSearchScraper(TwitterAPIScraper): name = twitter-search def __init__(self, query, cursor = None, top = False, **kwargs): super().__init__(baseUrl = https://twitter.com/search? + urllib.parse.urlencode({f: live, lang: en, q: query, src: spelling_expansion_revert_click}), **kwargs) self._query = query # Note: may get replaced by subclasses when using user ID resolution self._cursor = cursor self._top = top def _check_scroll_response(self, r): if r.status_code == 429: # Accept a 429 response as "valid" to prevent retries; handled explicitly in get_items return True, None if r.headers.get(content-type).replace( , ) != application/json;charset=utf-8: return False, fcontent type is not JSON if r.status_code != 200: return False, fnon-200 status code return True, None def get_items(self): paginationParams = { include_profile_interstitial_type: 1, include_blocking: 1, include_blocked_by: 1, include_followed_by: 1, include_want_retweets: 1, include_mute_edge: 1, include_can_dm: 1, include_can_media_tag: 1, skip_status: 1, cards_platform: Web-12, include_cards: 1, include_ext_alt_text: true, include_quote_count: true, include_reply_count: 1, tweet_mode: extended, include_entities: true, include_user_entities: true, include_ext_media_color: true, include_ext_media_availability: true, send_error_codes: true, simple_quoted_tweets: true, q: self._query, tweet_search_mode: live, count: 100, query_source: spelling_expansion_revert_click, cursor: None, pc: 1, spelling_corrections: 1, ext: ext=mediaStats%2ChighlightedLabel, } params = paginationParams.copy() del params[cursor] if self._top: del params[tweet_search_mode] del paginationParams[tweet_search_mode] for obj in self._iter_api_data(https://api.twitter.com/2/search/adaptive.json, params, paginationParams): yield from self._instructions_to_tweets(obj) @classmethod def setup_parser(cls, subparser): subparser.add_argument(--cursor, metavar = CURSOR) subparser.add_argument(--top, action = store_true, default = False, help = Enable fetching top tweets instead of live/chronological) subparser.add_argument(query, help = A Twitter search string) @classmethod def from_args(cls, args): return cls(args.query, cursor = args.cursor, top = args.top, retries = args.retries) class TwitterUserScraper(TwitterSearchScraper): name = twitter-user def __init__(self, username, isUserId, **kwargs): if not self.is_valid_username(username): raise ValueError(Invalid username) super().__init__(ffrom:{username}, **kwargs) self._username = username self._isUserId = isUserId self._baseUrl = fhttps://twitter.com/{self._username} if not self._isUserId else fhttps://twitter.com/i/user/{self._username} def _get_entity(self): self._ensure_guest_token() if not self._isUserId: fieldName = screen_name endpoint = https://api.twitter.com/graphql/-xfUfZsnR_zqjFd-IfrN5A/UserByScreenName else: fieldName = userId endpoint = https://twitter.com/i/api/graphql/WN6Hck-Pwm-YP0uxVj1oMQ/UserByRestIdWithoutResults params = {variables: json.dumps({fieldName: self._username, withHighlightedLabel: True}, separators = (,, :))} obj = self._get_api_data(endpoint, params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote)) if not obj[data]: return None user = obj[data][user] rawDescription = user[legacy][description] description = self._render_text_with_urls(rawDescription, user[legacy][entities][description][urls]) return User( username = user[legacy][screen_name], id = user[rest_id], displayname = user[legacy][name], description = description, rawDescription = rawDescription, descriptionUrls = [{text: x.get(display_url), url: x[expanded_url], tcourl: x[url], indices: tuple(x[indices])} for x in user[legacy][entities][description][urls]], verified = user[legacy][verified], created = email.utils.parsedate_to_datetime(user[legacy][created_at]), followersCount = user[legacy][followers_count], friendsCount = user[legacy][friends_count], statusesCount = user[legacy][statuses_count], favouritesCount = user[legacy][favourites_count], listedCount = user[legacy][listed_count], mediaCount = user[legacy][media_count], location = user[legacy][location], protected = user[legacy][protected], linkUrl = user[legacy][entities][url][urls][0][expanded_url] if url in user[legacy][entities] else None, linkTcourl = user[legacy].get(url), profileImageUrl = user[legacy][profile_image_url_https], profileBannerUrl = user[legacy].get(profile_banner_url), ) def get_items(self): if self._isUserId: # Resolve user ID to username self._username = self.entity.username self._isUserId = False self._query = ffrom:{self._username} yield from super().get_items() @staticmethod def is_valid_username(s): return (1 <= len(s) <= 15 and s.strip(string.ascii_letters + string.digits + _) == ) or (s and s.strip(string.digits) == ) @classmethod def setup_parser(cls, subparser): def username(s): if cls.is_valid_username(s): return s raise ValueError(Invalid username) subparser.add_argument(--user-id, dest = isUserId, action = store_true, default = False, help = Use user ID instead of username) subparser.add_argument(username, type = username, help = A Twitter username (without @)) @classmethod def from_args(cls, args): return cls(args.username, args.isUserId, retries = args.retries) class TwitterProfileScraper(TwitterUserScraper): name = twitter-profile def get_items(self): if not self._isUserId: userId = self.entity.id else: userId = self._username paginationParams = { include_profile_interstitial_type: 1, include_blocking: 1, include_blocked_by: 1, include_followed_by: 1, include_want_retweets: 1, include_mute_edge: 1, include_can_dm: 1, include_can_media_tag: 1, skip_status: 1, cards_platform: Web-12, include_cards: 1, include_ext_alt_text: true, include_quote_count: true, include_reply_count: 1, tweet_mode: extended, include_entities: true, include_user_entities: true, include_ext_media_color: true, include_ext_media_availability: true, send_error_codes: true, simple_quoted_tweets: true, include_tweet_replies: true, userId: userId, count: 100, cursor: None, ext: ext=mediaStats%2ChighlightedLabel, } params = paginationParams.copy() del params[cursor] for obj in self._iter_api_data(fhttps://api.twitter.com/2/timeline/profile/{userId}.json, params, paginationParams): yield from self._instructions_to_tweets(obj) class TwitterHashtagScraper(TwitterSearchScraper): name = twitter-hashtag def __init__(self, hashtag, **kwargs): super().__init__(f#{hashtag}, **kwargs) self._hashtag = hashtag @classmethod def setup_parser(cls, subparser): subparser.add_argument(hashtag, help = A Twitter hashtag (without #)) @classmethod def from_args(cls, args): return cls(args.hashtag, retries = args.retries) class TwitterTweetScraperMode(enum.Enum): SINGLE = single SCROLL = scroll RECURSE = recurse @classmethod def from_args(cls, args): if args.scroll: return cls.SCROLL if args.recurse: return cls.RECURSE return cls.SINGLE class TwitterTweetScraper(TwitterAPIScraper): name = twitter-tweet def __init__(self, tweetId, mode, **kwargs): self._tweetId = tweetId self._mode = mode super().__init__(fhttps://twitter.com/i/web/{self._tweetId}, **kwargs) def get_items(self): paginationParams = { include_profile_interstitial_type: 1, include_blocking: 1, include_blocked_by: 1, include_followed_by: 1, include_want_retweets: 1, include_mute_edge: 1, include_can_dm: 1, include_can_media_tag: 1, skip_status: 1, cards_platform: Web-12, include_cards: 1, include_ext_alt_text: true, include_quote_count: true, include_reply_count: 1, tweet_mode: extended, include_entities: true, include_user_entities: true, include_ext_media_color: true, include_ext_media_availability: true, send_error_codes: true, simple_quoted_tweet: true, count: 20, cursor: None, include_ext_has_birdwatch_notes: false, ext: mediaStats%2ChighlightedLabel, } params = paginationParams.copy() del params[cursor] if self._mode is TwitterTweetScraperMode.SINGLE: obj = self._get_api_data(fhttps://twitter.com/i/api/2/timeline/conversation/{self._tweetId}.json, params) yield self._tweet_to_tweet(obj[globalObjects][tweets][str(self._tweetId)], obj) elif self._mode is TwitterTweetScraperMode.SCROLL: for obj in self._iter_api_data(fhttps://twitter.com/i/api/2/timeline/conversation/{self._tweetId}.json, params, paginationParams, direction = ScrollDirection.BOTH): yield from self._instructions_to_tweets(obj, includeConversationThreads = True) elif self._mode is TwitterTweetScraperMode.RECURSE: seenTweets = set() queue = collections.deque() queue.append(self._tweetId) while queue: tweetId = queue.popleft() for obj in self._iter_api_data(fhttps://twitter.com/i/api/2/timeline/conversation/{tweetId}.json, params, paginationParams, direction = ScrollDirection.BOTH): for tweet in self._instructions_to_tweets(obj, includeConversationThreads = True): if tweet.id not in seenTweets: yield tweet seenTweets.add(tweet.id) if tweet.replyCount: queue.append(tweet.id) @classmethod def setup_parser(cls, subparser): group = subparser.add_mutually_exclusive_group(required = False) group.add_argument(--scroll, action = store_true, default = False, help = Enable scrolling in both directions) group.add_argument(--recurse, --recursive, action = store_true, default = False, help = Enable recursion through all tweets encountered (warning: slow, potentially memory-intensive!)) subparser.add_argument(tweetId, type = int, help = A tweet ID) @classmethod def from_args(cls, args): return cls(args.tweetId, TwitterTweetScraperMode.from_args(args), retries = args.retries) class TwitterListPostsScraper(TwitterSearchScraper): name = twitter-list-posts def __init__(self, listName, **kwargs): super().__init__(flist:{listName}, **kwargs) self._listName = listName @classmethod def setup_parser(cls, subparser): subparser.add_argument(list, help = A Twitter list ID or a string of the form "username/listname" (replace spaces with dashes)) @classmethod def from_args(cls, args): return cls(args.list, retries = args.retries)

间接在文件夹下scrape引入base文件即可

若何爬取粉丝信息

需要用到API ,举列neuro_scand那个账号,途径也需要本身修改

# 爬取粉丝的存眷者,每次都是20笔记录 # 需要修改的数据位第一个的user_name的名字,其他的都没有影响 import tweepy import csv ids=[] def get_list(user_name): user_id=user_name consumer_key = e1232137p consumer_secret = bUK4lzM123cWT2LiVUQHuCRDqMx5 access_token = 3388759955-3yNxjWBNns23QfQyOZ9uvZvsN9brZg access_token_secret = R8TGCOVGhNn123LmiYTg091Pd10vhpks auth = tweepy.OAuthHandler (consumer_key, consumer_secret) auth.set_access_token (access_token, access_token_secret) api = tweepy.API(auth,wait_on_rate_limit=True) print(api) for page in tweepy.Cursor(api.followers, screen_name=user_id).pages(): print("起头计数") ids.extend(page) print(len(ids)) return ids get_list(neuro_scand) print(len(ids)) def read_list(name): f = open(/root/scrapy_medical/neuro36/+name+.csv, a+, encoding=utf-8) csv_writer = csv.writer(f) csv_writer.writerow(["姓名", "地点", "描述"]) for i in range(len(ids)): list_xr=[] print(Name - + ids[i].name) print(Bio - + ids[i].description) print(Location - + ids[i].location) list_xr.append(ids[i].name) list_xr.append(ids[i].location) list_xr.append(ids[i].description) csv_writer.writerow(list_xr) list_xr.clear() read_list("neuro_scand")

若何通过天文位置爬虫信息

不让写了,代码上传到网盘了

链接: https://pan.baidu.com/s/1MlFF7zbTr_nt8xFJPpkzDg

提取码: wv71 复造那段内容后翻开百度网盘手机App,操做更便利哦

0
回帖

twitter(推特)爬虫全方位利用 期待您的回复!

取消