import asyncio
import time
import pymysql
import json
import re
import logging
from enum import Enum
from datetime import datetime
from typing import Optional
from traceback import format_exc
# 用patchright替换playwright
from patchright.async_api import async_playwright, Frame
from patchright.async_api import Error as PlaywrightError
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('GMGN Holders Tag')
class ChallengePlatform(Enum):
"""Cloudflare challenge platform types."""
JAVASCRIPT = "non-interactive"
MANAGED = "managed"
INTERACTIVE = "interactive"
class PumpRanks:
"""
从GMGN获取热门代币的前100holders,以及其标签,以及其历史战绩
"""
spider_name = 'gmgn_tags'
def __init__(self):
self._timeout = 30
async def on_response(self, response):
"""
拦截响应
数据结构 gmgn.json
"""
if not response.ok:
return
if '/v1/rank/sol/pump_ranks/1h' in response.url:
logger.info(f'捕获 pump_ranks 数据接口: {response.url}')
oridata = await response.body()
format_data = json.loads(oridata)
data = format_data['data']
completeds = data['completeds']
for c in completeds:
logger.info(f'代币--> {c}')
crawler_timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
address = c['address']
symbol = c['symbol']
usd_market_cap = c['usd_market_cap']
created_timestamp = c['created_timestamp']
created_timestamp = datetime.fromtimestamp(int(created_timestamp))
holder_count = c['holder_count']
top_10_holder_rate = c['top_10_holder_rate']
twitter = c.get('twitter', '无')
website = c.get('website', '无')
# 准备要插入的数据
data = (
address,
symbol,
usd_market_cap,
created_timestamp,
holder_count,
top_10_holder_rate,
twitter,
website,
crawler_timestamp
)
# 执行插入
insert_sql = """
INSERT INTO pump_token_info (
address, symbol, usd_market_cap, created_timestamp,
holder_count, top_10_holder_rate, twitter, website, crawler_timestamp
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
"""
# self.cursor.execute(insert_sql, data)
# self.connection.commit()
def _get_turnstile_frame(self, page) -> Optional[Frame]:
"""
Get the Cloudflare turnstile frame.
Returns
-------
Optional[Frame]
The Cloudflare turnstile frame.
"""
frame = page.frame(
url=re.compile(
"https://challenges.cloudflare.com/cdn-cgi/challenge-platform/h/[bg]/turnstile"
),
)
return frame
async def cookies(self, page) -> Optional[str]:
"""The cookies from the current page."""
cookies = await page.context.cookies()
if not cookies:
return None
for cookie in cookies:
if cookie["name"] == "cf_clearance":
return cookie["value"]
return None
async def detect_challenge(self, page) -> Optional[str]:
"""
Detect the Cloudflare challenge platform on the current page.
Returns
-------
Optional[ChallengePlatform]
The Cloudflare challenge platform.
"""
html = await page.content()
for platform in ChallengePlatform:
if f"cType: '{platform.value}'" in html:
return platform.value
return None
async def solve_challenge(self, page) -> None:
"""Solve the Cloudflare challenge on the current page."""
verify_button_pattern = re.compile(
"Verify (I am|you are) (not a bot|(a )?human)"
)
verify_button = page.get_by_role("button", name=verify_button_pattern)
challenge_spinner = page.locator("#challenge-spinner")
challenge_stage = page.locator("#challenge-stage")
start_timestamp = datetime.now()
cookies = await self.cookies(page)
challenge_type = await self.detect_challenge(page)
while (
cookies is None
and challenge_type is not None
and (datetime.now() - start_timestamp).seconds < self._timeout
):
if await challenge_spinner.is_visible():
await challenge_spinner.wait_for(state="hidden")
turnstile_frame = self._get_turnstile_frame(page)
if await verify_button.is_visible():
await verify_button.click()
await challenge_stage.wait_for(state="hidden")
elif turnstile_frame is not None:
await page.mouse.click(210, 290)
await challenge_stage.wait_for(state="hidden")
await page.wait_for_timeout(250)
async def detect(self, page):
"""
破解CloudFlare
"""
clearance_cookie = await self.cookies(page)
if clearance_cookie is None:
challenge_platform = await self.detect_challenge(page)
if challenge_platform is None:
logging.error("No Cloudflare challenge detected.")
return
logging.info(f"Solving Cloudflare challenge [{challenge_platform}]...")
try:
await self.solve_challenge(page)
except PlaywrightError as err:
logging.error(err)
async def run_local(self, proxy=None):
async with async_playwright() as p:
# 必须得是有头浏览器,否则过不了Cloudflare
launch_data = {
"headless": False,
"proxy": proxy,
"args": [
'--disable-blink-features=AutomationControlled',
'--disable-dev-shm-usage',
'--no-first-run',
'--no-default-browser-check',
'--disable-infobars',
'--disable-extensions',
'--remote-debugging-port=9222',
'--disable-features=VizDisplayCompositor'
]
}
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
browser = await p.chromium.launch(**launch_data)
context = await browser.new_context(user_agent=user_agent)
timeout = 30
context.set_default_timeout(timeout * 1000)
page = await context.new_page()
# 监听请求流
page.on('response', self.on_response)
url = 'https://larkfive.sg.larksuite.com/wiki/Yrh5wmnEji6h4nkjjLflj8BJgrc'
# 访问目标地址
await page.goto(url)
# 过反爬,如果不加就是被block的状态
await page.reload()
await asyncio.sleep(10)
await self.detect(page)
# 1小时后关闭浏览器
await page.evaluate("setTimeout(() => window.x = 5, 24 * 60 * 60 * 1000)") # 1小时后设置 window.x = 5
await page.wait_for_function("() => window.x > 0", timeout=0)
async def run_aws(self):
"""
在AWS服务器启动
"""
# proxy = self.proxy
from pyvirtualdisplay import Display
with Display():
try:
await self.run_local()
except:
logger.error(f'浏览器异常:{format_exc()}')
def task(self):
if env == 'local':
asyncio.run(self.run_local())
else:
asyncio.run(self.run_aws())
def run(self):
while True:
self.task()
logger.info('浏览器等待下一次启动')
time.sleep(60)
if __name__ == '__main__':
env = 'local'
PumpRanks().run()