编辑
2023-09-21
Python
00
请注意,本文编写于 74 天前,最后修改于 74 天前,其中某些信息可能已经过时。

相关信息

本周主要在帮助一个开源项目ToolBench的todo做贡献,需要中文的API数据,国内的开放平台搜寻之后发现聚合数据API最合适,原因是种类多,主要GET类型。本文主要分享一下经验,源码。

聚合数据API官网做了反爬,我刚开始在爬的时候sleep时间为1秒,没过一会就看到了405错误,后来加到5秒,使用了随机UA之后便正常了。我这里开了代理,代理端口是7890,如果没有代理可以去掉这个,不过建议还是开代理,多个代理来回切换着。

utils.py

python
# -*- coding: utf-8 -*- import json import keyword import os from time import sleep from fake_useragent import UserAgent import requests from bs4 import BeautifulSoup from urllib.parse import urljoin from juhe.translate import do_zh_to_en BASE_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) def get_random_ua(): ua = UserAgent() # 创建User-Agent对象 useragent = ua.random return useragent proxies = { 'http': 'http://127.0.0.1:7890', 'https': 'http://127.0.0.1:7890', } def post_to_api(url: str) -> str: headers = { 'User-Agent': get_random_ua() } response = requests.request("GET", url, headers=headers, proxies=proxies) if response.status_code == 405: sleep(3600) post_to_api(url) return response.text def get_api_doc_urls() -> list[dict]: """ 收集所有api分类地址 :return: """ page_list = [f"https://www.juhe.cn/docs/{page_index}" for page_index in range(1, 20)] api_category_urls = [] visited_urls = set() for page_url in page_list: headers = { 'User-Agent': get_random_ua() } response = requests.request("GET", page_url, headers=headers, proxies=proxies) html = response.text soup = BeautifulSoup(html, 'html.parser') for a in soup.find_all('a', class_='api-a-block'): href = a.get('href') if href and href not in visited_urls: visited_urls.add(href) if not href.startswith('http'): href = 'https://www.juhe.cn' + href h2 = a.find('h2', class_='api-name') if h2: print(h2.text) api_category_urls.append({ "api_category": h2.text, "href": href }) print(f"已解析分类地址{href}") sleep(5) return api_category_urls def get_api_des_url(urls: list[dict]) -> list[dict]: """ 获取id1,id2用来调用api详情接口 :param urls: :return: """ api_callable_url = [] for url in urls: headers = { 'User-Agent': get_random_ua() } response = requests.request("GET", url.get("href"), headers=headers, proxies=proxies) html = response.text soup = BeautifulSoup(html, 'html.parser') p_tags = soup.find_all('p', attrs={'data-id1': True, 'data-id2': True}) result = { "api_category": url.get("api_category"), "api_href_list": [] } for p_tag in p_tags: img_tag = p_tag.find('img') if img_tag and 'GET' in img_tag.get('src', ''): data_id1 = p_tag.get('data-id1') data_id2 = p_tag.get('data-id2') if data_id1 and data_id2: url = urljoin('https://www.juhe.cn/docs/api/id/', data_id1) + '/aid/' + data_id2 + '/' + data_id1 result.get("api_href_list").append(url) print(f"已解析api调用地址{url}") sleep(5) api_callable_url.append(result) return api_callable_url def get_param(url: str) -> dict: try: # 解析字符串 json_obj = json.loads(post_to_api(url)) html_content = json_obj["result"]["html"] soup = BeautifulSoup(html_content, 'html.parser') api_name = soup.find('div', class_="docs-top-title").text # api_url api_url = soup.find('b', string="接口地址:").find_next_sibling().text # header p_tag = soup.find('p', string="请求Header:") table = p_tag.find_next('table') # 在这个<table>标签中找到所有的<td>标签 tds = table.find_all('td') # 通过索引获取到Content-Type和application/x-www-form-urlencoded content_type = tds[1].text application = tds[2].text header = content_type + ":" + application # request_params p_tag = soup.find('p', string="请求参数说明:") table = p_tag.find_next('table') rows = table.find_all('tr')[1:] # 从1开始 request_params = [] for row in rows: tds = row.find_all('td') if tds[1].text == "key": continue info = { "名称": tds[1].text, "必填": tds[2].text, "类型": tds[3].text, "说明": tds[4].text.replace('\n', ' ').strip(), # 去掉换行符和空格 } request_params.append(info) print(f"已添加api信息{info}") result = { "api_name": api_name, "url": api_url, "header": header, "params": request_params } return result except Exception as e: print(e) raise RuntimeError(e) def format_to_python_func_name(text: str) -> str: # 将文本转换为小写并分割为单词 words = text.lower().split() # 使用下划线连接单词 function_name = "_".join(words) # 移除非字母数字和下划线的字符 function_name = "".join(c for c in function_name if c.isalnum() or c == '_') return function_name def write_api_file(api_info_list: list[dict]): tool_dir = os.path.join(BASE_PATH, "juhe_tools") for url in api_info_list: category_name = url.get("api_category") if category_name[0].isdigit(): category_name = "_" + category_name if '/' in category_name: category_name.replace('/', "") api_url_list = url.get("api_href_list") api_file_path = os.path.join(tool_dir, category_name) os.makedirs(api_file_path, exist_ok=True) api_file_content = """ import requests """ for api_url in api_url_list: api_param = get_param(api_url) sleep(5) url = api_param.get("url") header = api_param.get("header") api_name = api_param.get("api_name") api_name_en = format_to_python_func_name(do_zh_to_en(api_name)) if api_name_en[0].isdigit(): api_name_en = "_" + api_name_en params_list = api_param.get("params") # 生成函数参数 required_params = ["key"] optional_params = [] for param in params_list: param_name = param.get("名称") if not param_name: # 跳过空参数 continue param_name = param_name.replace("-", "_") if keyword.iskeyword(param_name): # 如果参数名是python关键字,添加"_" param_name += "_" is_required = param.get("必填") if is_required == "是": required_params.append(f"{param_name}") else: optional_params.append(f"{param_name}=None") # 生成params params_str = f""" if key is not None: params['key'] = key """ for param in params_list: param_name = param.get("名称") if not param_name: # 跳过空参数 continue param_name_in_func = param_name.replace("-", "_") if keyword.iskeyword(param_name_in_func): # 如果参数名是python关键字,添加"_" param_name_in_func += "_" params_str += f""" if {param_name_in_func} is not None: params['{param_name}'] = {param_name_in_func} """ header_key, header_value = header.split(":") headers = {header_key.strip(): header_value.strip()} # 生成函数内容 api_file_content += f""" def {api_name_en}({', '.join(required_params + optional_params)}) -> str: url = f"{url}" params = {{}} headers = { headers } {params_str} response = requests.get(url, headers=headers, params=params) return response.text """ api_file_path = os.path.join(api_file_path, "api.py") with open(api_file_path, 'w') as api_file: api_file.write(api_file_content)

main.py

python
from utils import ( get_api_doc_urls, get_api_des_url, write_api_file ) def main(): # 获取所有分类 urls = get_api_doc_urls() # 获取所有api详情接口地址 api_urls = get_api_des_url(urls=urls) # 写文件 write_api_file(api_urls) if __name__ == '__main__': main()
如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:MrBun

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!