- 创建一个 OSS
- 在授权策略中选择创建的子用户,赋予所有权限
- 同样的操作进行代码部署即可
# -*- coding: utf-8 -*-
import logging
import time
import requests
import re
import oss2
from io import BytesIO, StringIO
import csv
# Constants
OSS_ENDPOINT = ''
OSS_BUCKET_NAME = ''
OSS_ID = ''
OSS_SECRET = ''
API_KEY = '' #
# 如果想更换RSS内容,需要更改以下3个常量
TABLE_NAME = 'MgRss' # Table name
XMOL_URL = 'https://www.x-mol.com/paper/search/result?searchLogId=96949c5b-af56-4e42-bb96-4999ab366b97&searchSort=publishDate&readMode=zh'
PUBMED_URL = 'https://pubmed.ncbi.nlm.nih.gov/rss/search/1LWII-KBKMy5fadnznHlMGiT92PGjMjI5y0QlvnPsdTdofc1sJ/?limit=15&utm_campaign=pubmed-2&fc=20240517035732'
# Global variable to store processed links
processed_links = []
def save_to_oss(bucket, file_name, content):
"""
Save content to OSS.
"""
bucket.put_object(file_name, content)
print(f'File {file_name} has been uploaded to OSS.')
def load_from_oss(bucket, file_name):
"""
Load content from OSS.
"""
try:
content = bucket.get_object(file_name).read()
return content.decode('utf-8')
except oss2.exceptions.NoSuchKey:
return None
except UnicodeDecodeError as e:
print(f'Error decoding content from OSS: {e}')
return None
def put_rows(bucket, articles):
"""
Insert multiple rows into the CSV file in OSS.
"""
file_name = f'{TABLE_NAME}.csv'
content = load_from_oss(bucket, file_name)
if content:
rows = list(csv.reader(StringIO(content)))
else:
rows = []
for article in articles:
row = [article["doi"], article["title"], article["pub_date"], article["journal"], article["impact_factor"], article["authors"], article["abstract"]]
rows.append(row)
output = StringIO()
writer = csv.writer(output)
writer.writerows(rows)
save_to_oss(bucket, file_name, output.getvalue().encode('utf-8'))
print(f'{len(articles)} rows have been added to the CSV file.')
# for article in articles:
# print(f'Row with DOI {article["doi"]} has been added.')
def get_range(bucket):
"""
Get the range of rows from the CSV file in OSS.
"""
global processed_links
file_name = f'{TABLE_NAME}.csv'
content = load_from_oss(bucket, file_name)
if content:
rows = list(csv.reader(StringIO(content)))
for row in rows:
processed_links.append(row[0])
return processed_links
def get_latest_items(url, source):
"""
Get the latest items from the given URL based on the source.
"""
if source == 'xmol':
pattern = r"""<div class="it-bold space-bottom-m10">.*?</span>(.*?)</div>.*?<a target="_blank" onclick=.*? href=(.*?)>.*?<div class="div-text-line-one it-new-gary">.*?<em class="it-blue">(.*?)</em>.*?<span style="color: #FF7010;">(.*?)</span>.*?Pub Date : (.*?), DOI:(.*?)</div>.*?<div class="div-text-line-one it-new-gary">(.*?)</div>.*?<div class="div-text-line-three itsmlink">(.*?)</div>.*?"""
elif source == 'pubmed':
pattern = r"""<item>.*?<title>(.*?)</title>.*?<content:encoded>.*?<p xmlns:xlink="http://www.w3.org/1999/xlink" xmlns:mml="http://www.w3.org/1998/Math/MathML" xmlns:p1="http://pubmed.gov/pub-one">(.*?)</p>.*?DOI:<a href=(.*?)>.*?<dc:creator>(.*?)</dc:creator>.*?<dc:date>(.*?)</dc:date>.*?<dc:source>(.*?)</dc:source>"""
matches = re.findall(pattern, requests.get(url).text, re.DOTALL)
return matches
def get_message_content(match, source):
"""
Format the message content based on the source.
"""
if source == 'xmol':
title = match[0].strip()
url = f'https://www.x-mol.com{match[1].strip()[1:-1]}'
journal = match[2].strip()
impact_factor = match[3].strip()
pub_date = match[4].strip()
doi = match[5].strip()
authors = match[6].strip()
abstract = re.sub(r'\s*\.\.\.', '', match[7].strip())
abstract = re.sub(r'<img src="https://static.x-mol.com/jcss/images/icon-oa.jpg"/> \s*', '', abstract)
elif source == 'pubmed':
title = match[0].strip()
journal = match[5].strip()
impact_factor = ""
pub_date = match[4].strip()
doi = match[2].strip()[16:]
authors = match[3].strip()
abstract = match[1].strip()
article = {
"doi": doi,
"title": title,
"pub_date": pub_date,
"journal": journal,
"impact_factor": impact_factor,
"authors": authors,
"abstract": abstract
}
formatted_content = f"""✅《{title}》
ℹ️Journal: {journal} IF: {impact_factor}
ℹ️Date: {pub_date}
ℹ️Authors: {authors}
ℹ️DOI: https://doi.org/{doi}
ℹ️Abstract: {abstract}"""
return formatted_content, article
def send_to_wechat(content, key):
"""
Send the formatted content to WeChat.
"""
webhook_url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={key}"
headers = {
"Content-Type": "application/json"
}
data = {
"msgtype": "text",
"text": {
"content": content
}
}
response = requests.post(webhook_url, json=data, headers=headers)
if response.status_code == 200:
print("Message sent successfully to WeChat!")
else:
print(f"Failed to send message to WeChat. Status Code: {response.status_code}")
def save_new_articles(items, source):
"""
Save new articles to the table if they are not already processed.
"""
global processed_links
new_articles = []
for item in items:
message, article = get_message_content(item, source)
if article["doi"] not in processed_links:
# print(f'{article["doi"]} 未保存,正在保存')
new_articles.append(article)
processed_links.append(article["doi"])
send_to_wechat(message, API_KEY)
return new_articles
def setup_table(bucket, items_xmol, items_pubmed):
"""
Delete and create the table for initial setup.
"""
file_name = f'{TABLE_NAME}.csv'
try:
bucket.delete_object(file_name)
print(f'Table {file_name} has been deleted.')
except oss2.exceptions.NoSuchKey:
print(f'Table {file_name} does not exist.')
# Create a new file with header
header = ["doi", "title", "pub_date", "journal", "impact_factor", "authors", "abstract"]
output = StringIO()
writer = csv.writer(output)
writer.writerow(header)
# Collect new articles
new_articles_xmol = save_new_articles(items_xmol, 'xmol')
new_articles_pubmed = save_new_articles(items_pubmed, 'pubmed')
all_new_articles = new_articles_xmol + new_articles_pubmed
for article in all_new_articles:
row = [article["doi"], article["title"], article["pub_date"], article["journal"], article["impact_factor"], article["authors"], article["abstract"]]
writer.writerow(row)
save_to_oss(bucket, file_name, output.getvalue().encode('utf-8'))
print(f'CSV file {file_name} has been created and uploaded to OSS.')
def main():
"""
Main function to execute the script.
"""
global processed_links
items_xmol = get_latest_items(XMOL_URL, 'xmol')
print(f'当前抓取的RSS数目为:{len(items_xmol)}')
items_pubmed = get_latest_items(PUBMED_URL, 'pubmed')
print(f'当前抓取的Pubmed数目为:{len(items_pubmed)}')
auth = oss2.Auth(OSS_ID, OSS_SECRET)
bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME)
# # 初始化表格并写入当前获取的items
# setup_table(bucket, items_xmol, items_pubmed)
# 获取已处理的链接
processed_links = get_range(bucket)
print('当前已处理的链接数量为:', len(processed_links))
# 处理新的文章
new_articles_xmol = save_new_articles(items_xmol, 'xmol')
new_articles_pubmed = save_new_articles(items_pubmed, 'pubmed')
all_new_articles = new_articles_xmol + new_articles_pubmed
print(f'{len(all_new_articles)} new articles found.')
# 一次性上传所有新文章
if all_new_articles:
put_rows(bucket, all_new_articles)
def handler(event, context):
"""
Handler function for the cloud function.
"""
logger = logging.getLogger()
main()
logger.info('Execution completed')
return 'Execution completed'
# if __name__ == "__main__":
# main()