个人情报舆情分析系统——微信公众号文章实时采集方法(一)
前言
本篇文章仅供个人学习参考使用,不用于任何商业用途。
在超过两千万的公众号中,假如每个公众号平均发文1000篇,合计500亿篇文章,每个中国人能分到36篇。这些文章在提供大量资讯、信息、知识的同时,已经渗透到了各行各业。其中,互联网、教育、金融的内容最多。细细深挖,总能发现鲜为人知的信息差,如果你了解公开情报分析 Open Source Intelligence 的原则,微信公众号的数据一定不能错过。
只要有一个微信账号,任何一个公众号的内容都可以查看,但是你真的会看吗?真的能看懂吗?真的有时间看吗?请承认,总有那么一小撮人,深谙这些方法和技巧,他们的思维方式和信息渠道完全吊打挤在同一条地铁中的其他人。不管是印刷术、互联网还是微信公众号实际上都加剧了信息不对称,请务必笃信这一点。
需求
正如前言所提到的,你可能已经关注了几百个微信公众号,其中藏匿着很多对你有价值的信息,如何更有效地聚合这些信息以供阅读或是进一步的处理便成了一个问题。我已经将「转发公众号文章给微信机器人以供后续处理」这步融入自己的工作流快两个月了,但随着使用时间变长、使用频次加大,使用范围扩宽,我愈发觉得这套系统依然不够高效,因为搜集资讯的执行者还是我本人,大量的时间花费在了翻阅各个公众号的操作逻辑中,这不优雅。因此我对这套系统有了新的需求,具体如下:
- 能够实时采集公众号资讯
- 自动对文章根据我指定的标签进行文本分类
- 自动进行关键词抽取、事件抽取
- 自动进行文本摘要
- ……
本篇文章是该系列的第一篇,以实现第一个需求为主。
实施
根据我目前想到的方法,简单列了一下目前用于获取微信公众号文章数据的方法:
- 使用第三方平台 API 获取
- 从搜狗微信搜索平台采集
- 订阅 RSS
- 基于微信公众号平台采集
- 利用自动化测试框架模拟点击
- 借助移动端构造请求采集
- 被动接受公众号消息采集
最近从 Clubhouse 学到了一个词,叫做「FOMO(Fear of Missing Out)」,即「害怕错过朋友圈里发生的事情」。我发现我对资讯的时间也比较敏感,所以我希望尽可能的实时,晚一天就不符合我的需求了,根据我的实践,只有最后一种方式满足我的需求。但为了描述探索的过程,我将依次介绍这些方法,省时间可以直接看最后一个方法:
使用第三方平台 API 获取
根据调研的结果,发现主要有:新榜、清博、拓途数据这些数据服务商,但在搜索过程中我也发现了一篇法院裁定禁止擅自爬取微信公众号数据的文章,这也是文章开头添加警示的原因。
- 新榜
根据新榜新媒体数据API,确实提供了不少能力,其中关于微信文章的 API 如下:
它的计费方式是自有的,注册试用送了 2000 unit,之后应该是 3000 元送 两万 unit,也就是说不带正文的公众号文章数据大概为 0.15元/条。根据我对 API 的测试,获取到的数据并不是实时的,应该有一天左右的延迟,加上收费,遂跳过。
- 清博
根据清博开放平台API文档,我只发现了头条账号、抖音账号、快手账号,遂跳过。清博的「微信文章采集」入口我以自己的小众公众号「xyzlabAI」作为实验,并没有任何数据,所以跳过。
- 拓途数据
拓途数据应该是对非开发者较友好的平台了,可以很方便的添加监控任务,不带正文的费用同样也是 0.15/篇:
但好像被暂停使用了:
根据拓途数据API文档看应该是挺符合我的使用需求的,但我并没有试用API,数据延迟方面没有结论。因为监控量大了之后,其实这也是一笔不小的费用,遂跳过。
从搜狗微信搜索平台采集
搜狗微信搜索平台从实时性上观测效果应该还可以,但是仅显示最近10条群发,遂跳过。
订阅RSS
RSS(Really Simple Syndication)中文名称简易信息聚合,广泛用于网上新闻频道、Blog 等订阅,使用 RSS 订阅能更快地获取信息,顺带一提的是,本博客也支持 RSS 订阅,可以点击这里以 RSS 订阅本博客。
说到 RSS 就不得不提一提 RSSHub,RSSHub 是一个开源、简单易用、易于扩展的 RSS 生成器,可以给任何奇奇怪怪的内容生成 RSS 订阅源,目前已适配数百家网站的上千项内容,这是一个非常好的工具,之后在扩展本个人情报舆情分析系统的时候,订阅大佬们的微博、Twitter 的消息时应该会采用该方法。
根据 RSSHub 的文档,RSSHub 提供了八种 RSS 订阅源,但目前应该只有三种可以使用了,其中一种还需要通过转发微信消息至 Telegram,故实际可以开箱即用的其实只有两种。分别是 CraeerEngine 来源与二十次幂来源。经测试,RSS 数据同样不是实时的,遂跳过。
基于微信公众号平台采集
在微信公众号平台其实有一个实时获取微信公众号资讯的接口的,这就需要拥有一个公众号,之后只需要在公众号中新建图文:
在新图文的页面中只需要选择编辑超链接,选择其他公众号就可以进行资讯的搜索并获取相应列表:
中间过程不再赘述,直接给出代码:
import time
import requests
import pandas as pd
from datetime import datetime, timedelta
def get_fakeid():
"""用于获取公众号的 fakeid
"""
df = pd.DataFrame()
content = pd.read_excel('公众号.xlsx') # 读取需要获取 fakeid 的公众号列表文件
search_params = {
'action': 'search_biz',
'begin': 0,
'count': 5,
'query': '',
'token': TOKEN,
'lang': 'zh_CN',
'f': 'json',
'ajax': 1,
}
headers = {
'cookie': COOKIE,
}
URL = 'https://mp.weixin.qq.com/cgi-bin/searchbiz'
for mp in content['NickName']:
search_params['query'] = mp
res = requests.get(URL, params=search_params, headers=headers).json()
print('{}:{}'.format(mp, res['list'][0]['fakeid']))
s = pd.Series({
'NickName': res['list'][0]['nickname'],
'fakeid': res['list'][0]['fakeid'],
})
df = df.append(s, ignore_index=True)
time.sleep(10)
df.to_excel('fakeid.xlsx', index=False) # 防止 API 被限制数据丢失,每轮循环保存一次
def get_mps(start, end):
"""用于获取指定日期范围的资讯列表
Args:
start (string): 起始日期, e.g. '2021-02-01'
end (string): 结束日期, e.g. '2021-02-04',自动延续到当天 23:59:59
"""
start = datetime.strptime(start, '%Y-%m-%d')
end = datetime.strptime(end, '%Y-%m-%d')
end = end + timedelta(hours=23, minutes=59, seconds=59)
start = start.timestamp()
end = end.timestamp()
df = pd.DataFrame()
content = pd.read_excel('公众号.xlsx') # 读取包含 fakeid 的待读取公众号列表
search_params = {
'action': 'list_ex',
'begin': 0,
'count': 5,
'fakeid': '',
'query': '',
'token': TOKEN,
'lang': 'zh_CN',
'f': 'json',
'ajax': 1,
}
headers = {
'cookie': COOKIE,
}
URL = 'https://mp.weixin.qq.com/cgi-bin/appmsg'
for mp in content['fakeid']:
search_params['fakeid'] = mp
res = requests.get(URL, params=search_params,
headers=headers).json()['app_msg_list']
articles = [{'标题': i['title'], '源链接': i['link'],
'摘要': i['digest'], '时间': i['create_time']} for i in res]
times = [i['create_time'] for i in res]
count = sum(start < i < end for i in times)
while count == len(times):
time.sleep(10)
search_params['begin'] += 5
res = requests.get(URL, params=search_params,
headers=headers).json()['app_msg_list']
articles.extend([{'标题': i['title'], '源链接': i['link'],
'摘要': i['digest'], '时间': i['create_time']} for i in res])
times = [i['create_time'] for i in res]
count = sum(start < i < end for i in times)
for article in articles:
s = pd.Series({
'标题': article['标题'],
'源链接': article['源链接'],
'摘要': article['摘要'],
'时间': datetime.fromtimestamp(article['时间']).strftime('%Y-%m-%d'),
})
df = df.append(s, ignore_index=True)
df.to_excel('公众号文章.xlsx', index=False) # 防止 API 被限制数据丢失,每轮循环保存一次
if __name__ == '__main__':
start = '2021-01-29'
end = '2021-02-04'
TOKEN = '' # 你的 TOKEN
COOKIE = '' # 你的 Cookie
get_fakeid()
get_mps(start, end)
代码中的 TOKEN
、COOKIE
只需要在浏览器的请求中获取并填入即可,首先需要获取到公众号对应的 fakeid,再模拟请求获取对应的资讯数据。根据测试结果,一百出头一点的公众号获取 fakeid 稍微快一点就会被限制(代码中换用每个公众号获取完 fakeid 等待 10 秒,没有遇到限制,但并没有测试该 API 访问总量的限制),所以这种方式仅仅只能作为一种备选方式。
另外这里简单说一下,微信平台中 fakeid 是一对一的,而 openid 是一对多的,即每一个公众号拥有唯一的 fakeid,但不同的用户关注同一个公众号会有不同的 openid。
利用自动化测试框架模拟点击
使用自动化测试框架控制客户端,模拟点击,获取参数,嫌麻烦,所以并没有写代码。
借助移动端构造请求采集
三四年前第一次实习,在参加公司内部 Hackathon 的时候,曾进行过微信公众号文章数据的采集,用于 AI 模型的训练数据的补充(比赛拿了第一名 LOL)。当时还采集了点赞量、评论量、阅读量、正文内容等数据,主要是基于中间人攻击的方式。现在偷懒直接买了一套源代码,看了一眼是结合 MongoDB 与 Elasticsearch 的 Flask 服务,结合 mitmproxy 与旧版微信客户端进行模拟请求,需要用户手动点击公众号(当然可以利用自动化测试框架,但懒得写了)历史列表以及正文数据,便可以获取公众号的历史数据了:
一开始还以为是我自己买的不是商用版的原因,导致只能一次性获取一个公众号的内容,后面问了作者,发现商业版一次也只支持一个公众号的采集。如果我每次采集都需要自己手动选择公众号,这其实反倒降低了我的效率,只能说工具的定位不同,需求不一样,这个工具更加偏向于公众号历史数据的采集,之后会考虑采用该工具来采集所需公众号的历史数据以及正文数据,搭建自己的知识库,方便 Elasticsearch 进行搜索。
被动接受公众号消息采集
其实以上所有的方法都需要自己主动去采集,虽然增加了主观能动性,但使用起来依然非常不爽,直接让他端上来不就好了吗,微信也是这么做的。本方法其实是微信个人号限制破局 —— 后 itchat 时代 Serverless 与飞书捷径的妙用的这篇文章的功能延续,既然微信客户端接受订阅的公众号的消息,那直接处理就可以了。在具体操作起来还是有一些坑的,这里梳理一下。依然从 autoReplyByAI
这个方法开始:
#pragma mark - Other
- (void)autoReplyByAI:(AddMsg *)addMsg
{
if (addMsg.msgType != 1) {
return;
}
NSString *userName = addMsg.fromUserName.string;
MMSessionMgr *sessionMgr = [[objc_getClass("MMServiceCenter") defaultCenter] getService:objc_getClass("MMSessionMgr")];
WCContactData *msgContact = nil;
if (LargerOrEqualVersion(@"2.3.26")) {
msgContact = [sessionMgr getSessionContact:userName];
} else {
msgContact = [sessionMgr getContact:userName];
}
if ([msgContact isBrandContact] || [msgContact isSelf]) {
// 该消息为公众号或者本人发送的消息
return;
}
YMAIAutoModel *AIModel = [[YMWeChatPluginConfig sharedConfig] AIReplyModel];
if (AIModel.specificContacts.count < 1) {
return;
}
[AIModel.specificContacts enumerateObjectsUsingBlock:^(NSString *wxid, NSUInteger idx, BOOL * _Nonnull stop) {
if ([wxid isEqualToString:addMsg.fromUserName.string]) {
NSString *content = @"";
NSString *session = @"";
if ([wxid containsString:@"@chatroom"]) {
NSArray *contents = [addMsg.content.string componentsSeparatedByString:@":\n"];
NSArray *sessions = [wxid componentsSeparatedByString:@"@"];
if (contents.count > 1) {
content = contents[1];
}
if (sessions.count > 1) {
session = sessions[0];
}
} else {
content = addMsg.content.string;
session = wxid;
}
[[YMNetWorkHelper share] GET:content session:session success:^(NSString *content, NSString *session) {
[[YMMessageManager shareManager] sendTextMessage:content toUsrName:addMsg.fromUserName.string delay:kArc4random_Double_inSpace(3, 8)];
}];
}
}];
}
可以看到代码中若是接收到了公众号或者本人发送的消息则直接 return
,因此 addMsg
中一定可以介绍到这些类型的消息。在一开始尝试的时候,我直接用 NSLog(@"消息内容:%@", addMsg);
的形式去打印的这些内容,当时也并没有注意字段的结尾,便下了草率的结论:微信客户端接受公众号消息只接受第一条,需要点进去之后才会获取剩余的内容,便于节省流量。第二天我想想不对劲,问了专门做逆向的同学,说他朋友弄过安卓的。但由于安卓不是我的主力机,我也不再细问,往 Xcode 控制台瞟了一眼发现打印的字符串结尾像是被突然截断了一样,让我觉得他并没有打印完全。
我问了做 iOS 开发的同学,他来了一句:「怎么可能?」之后我在开头加上了一段宏:
#ifdef DEBUG
#define NSLog(FORMAT, ...) fprintf(stderr, "%s:%zd\t%s\n", [[[NSString stringWithUTF8String: __FILE__] lastPathComponent] UTF8String], __LINE__, [[NSString stringWithFormat: FORMAT, ## __VA_ARGS__] UTF8String]);
#else
#define NSLog(FORMAT, ...) nil
#endif
然后 NSLog 就可以将这些信息完全输出出来了:微信客户端在接受订阅公众号的消息时,是以 XML 形式接受本次推送的所有文章的。接下来只需要修改相关部分的代码即可(我之前从来没有写过 Objective-C,轻喷):
#pragma mark - Other
- (void)autoReplyByAI:(AddMsg *)addMsg
{
if (addMsg.msgType != 1 && addMsg.msgType != 49) {
return;
}
NSString *userName = addMsg.fromUserName.string;
MMSessionMgr *sessionMgr = [[objc_getClass("MMServiceCenter") defaultCenter] getService:objc_getClass("MMSessionMgr")];
WCContactData *msgContact = nil;
if (LargerOrEqualVersion(@"2.3.26")) {
msgContact = [sessionMgr getSessionContact:userName];
} else {
msgContact = [sessionMgr getContact:userName];
}
if ([msgContact isSelf]) {
//该消息为公众号或者本人发送的消息
return;
}
YMAIAutoModel *AIModel = [[YMWeChatPluginConfig sharedConfig] AIReplyModel];
// 监听关注的公众号消息
if([msgContact isBrandContact]){
if ([addMsg.content.string hasPrefix:@"<msg>"]){
NSError *error = nil;
NSDictionary *xmlDict = [XMLReader dictionaryForXMLString:addMsg.content.string error:&error];
NSDictionary *msgDict = [xmlDict valueForKey:@"msg"];
NSDictionary *appMsgDict = [msgDict valueForKey:@"appmsg"];
NSDictionary *mmreaderDict = [appMsgDict valueForKey:@"mmreader"];
NSDictionary *categoryDict = [mmreaderDict valueForKey:@"category"];
NSArray *items = [categoryDict valueForKey:@"item"];
NSMutableArray *mps = [[NSMutableArray alloc] init];
if (items.count > 20) {
NSString *title = @"";
NSString *url = @"";
NSString *pub_time = @"";
NSString *digest = @"";
NSString *source = @"";
NSDictionary *titleDict = [items valueForKey:@"title"];
title = [titleDict valueForKey:@"text"];
NSDictionary *urlDict = [items valueForKey:@"url"];
url = [urlDict valueForKey:@"text"];
NSDictionary *pub_timeDict = [items valueForKey:@"pub_time"];
pub_time = [pub_timeDict valueForKey:@"text"];
NSDictionary *digestDict = [items valueForKey:@"digest"];
digest = [digestDict valueForKey:@"text"];
NSDictionary *sourcesDict = [items valueForKey:@"sources"];
NSDictionary *sourceDict = [sourcesDict valueForKey:@"source"];
NSDictionary *nameDict = [sourceDict valueForKey:@"name"];
source = [nameDict valueForKey:@"text"];
NSDictionary *article = @{
@"title": title,
@"url": url,
@"pub_time": pub_time,
@"digest": digest,
@"source": source,
};
[mps addObject:article];
} else {
for (id item in items) {
NSString *title = @"";
NSString *url = @"";
NSString *pub_time = @"";
NSString *digest = @"";
NSString *source = @"";
NSDictionary *titleDict = [item valueForKey:@"title"];
title = [titleDict valueForKey:@"text"];
NSDictionary *urlDict = [item valueForKey:@"url"];
url = [urlDict valueForKey:@"text"];
NSDictionary *pub_timeDict = [item valueForKey:@"pub_time"];
pub_time = [pub_timeDict valueForKey:@"text"];
NSDictionary *digestDict = [item valueForKey:@"digest"];
digest = [digestDict valueForKey:@"text"];
NSDictionary *sourcesDict = [item valueForKey:@"sources"];
NSDictionary *sourceDict = [sourcesDict valueForKey:@"source"];
NSDictionary *nameDict = [sourceDict valueForKey:@"name"];
source = [nameDict valueForKey:@"text"];
NSDictionary *article = @{
@"title": title,
@"url": url,
@"pub_time": pub_time,
@"digest": digest,
@"source": source,
};
[mps addObject:article];
}
}
NSError *parseError = nil;
NSData *jsonData = [NSJSONSerialization dataWithJSONObject:mps options:NSJSONWritingPrettyPrinted error:&parseError];
NSString *str = [[NSString alloc] initWithData:jsonData encoding:NSUTF8StringEncoding];
// NSLog(@"格式化内容:%@", str);
[AIModel.specificContacts enumerateObjectsUsingBlock:^(NSString *wxid, NSUInteger idx, BOOL * _Nonnull stop) {
[[YMNetWorkHelper share] GET:str session:wxid success:^(NSString *content, NSString *session) {
}];
}];
} else {
return;
}
};
if (AIModel.specificContacts.count < 1) {
return;
}
[AIModel.specificContacts enumerateObjectsUsingBlock:^(NSString *wxid, NSUInteger idx, BOOL * _Nonnull stop) {
if (addMsg.msgType == 1) {
if ([wxid isEqualToString:addMsg.fromUserName.string]) {
NSString *content = @"";
NSString *session = @"";
if ([wxid containsString:@"@chatroom"]) {
NSArray *contents = [addMsg.content.string componentsSeparatedByString:@":\n"];
NSArray *sessions = [wxid componentsSeparatedByString:@"@"];
if (contents.count > 1) {
content = contents[1];
}
if (sessions.count > 1) {
session = sessions[0];
}
} else {
content = addMsg.content.string;
session = wxid;
}
[[YMNetWorkHelper share] GET:content session:session success:^(NSString *content, NSString *session) {
[[YMMessageManager shareManager] sendTextMessage:content toUsrName:addMsg.fromUserName.string delay:kArc4random_Double_inSpace(3, 8)];
}];
}
} else if (addMsg.msgType == 49) {
if ([wxid isEqualToString:addMsg.fromUserName.string]) {
NSString *msgContentStr = nil;
NSString *session = @"";
if ([addMsg.fromUserName.string containsString:@"@chatroom"]) {
NSArray *msgAry = [addMsg.content.string componentsSeparatedByString:@":\n<?xml"];
NSArray *sessions = [wxid componentsSeparatedByString:@"@"];
if (msgAry.count > 1) {
msgContentStr = [NSString stringWithFormat:@"<?xml %@",msgAry[1]];
} else {
msgAry = [addMsg.content.string componentsSeparatedByString:@":\n<msg"];
if (msgAry.count > 1) {
msgContentStr = [NSString stringWithFormat:@"<msg%@",msgAry[1]];
}
if (sessions.count > 1) {
session = sessions[0];
}
}
} else {
msgContentStr = addMsg.content.string;
session = wxid;
}
NSString *url = @"";
NSString *title = @"";
NSError *error;
NSDictionary *xmlDict = [XMLReader dictionaryForXMLString:msgContentStr error:&error];
NSDictionary *msgDict = [xmlDict valueForKey:@"msg"];
NSDictionary *appMsgDict = [msgDict valueForKey:@"appmsg"];
NSDictionary *titleDict = [appMsgDict valueForKey:@"title"];
title = [titleDict valueForKey:@"text"];
NSDictionary *urlDict = [appMsgDict valueForKey:@"url"];
url = [urlDict valueForKey:@"text"];
NSDictionary *content = @{
@"title":title,
@"url":url
};
NSError *parseError = nil;
NSData *jsonData = [NSJSONSerialization dataWithJSONObject:content options:NSJSONWritingPrettyPrinted error:&parseError];
NSString *str = [[NSString alloc] initWithData:jsonData encoding:NSUTF8StringEncoding];
[[YMNetWorkHelper share] GET:str session:session success:^(NSString *str, NSString *session) {
[[YMMessageManager shareManager] sendTextMessage:str toUsrName:addMsg.fromUserName.string delay:kArc4random_Double_inSpace(3, 8)];
}];
}
}
}];
}
自此,插件就会将接收到的订阅公众号消息自动转发到之前指定的 API 地址,API 地址中也需要进行变动。之前是通过飞书捷径与飞书文档、Quip 的结合来进行数据的存储的,但这次转发所有的订阅公众号数据量应该比较大,预计一周数据量在一千条以上,于是此次引入了 Leancloud,同样也是本博客的评论、阅读量的后端系统,主要是用来接受、存储这些文章数据。之前的文章忘了写 Serverless 这边的函数,这边补充一下这次相关的:
# -*- coding: utf8 -*-
import re
import json
import requests
import leancloud
from datetime import datetime
leancloud.init("{{应用AppID}}", "{{应用Key}}")
TOKEN = 'Bearer {{飞书文档TOKEN}}'
headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Authorization': TOKEN}
def main_handler(event, context):
print("Received event: " + json.dumps(event, indent = 2))
print("Received context: " + str(context))
CATCH_URL = '' # 飞书捷径 webhook 地址
question = event['queryString']['question']
data = None
try:
question = json.loads(question)
except:
return None
if isinstance(question, list):
for i in question:
post_data = {
'pub_time': datetime.fromtimestamp(int(i['pub_time'])).strftime('%Y-%m-%d %T'),
'title': i['title'],
'digest': i['digest'],
'url': i['url'],
'source': i['source'],
}
r = requests.post(CATCH_URL, json=post_data) # 转存至飞书文档
mps = leancloud.Object.extend('mps')
mps = mps()
mps.set("title", i['title'])
mps.set("digest", i['digest'])
mps.set("url", i['url'])
mps.set("pub_time", datetime.fromtimestamp(int(i['pub_time'])))
mps.set("source", i['source'])
mps.save()
return data
自此,就可以在 Leancloud 数据仓库中获取到实时的微信公众号文章数据了,当然这些数据字段是根据我自己的数据字段制定的,可以在代码里面改动进行调整。
代码部分是完成了,但是需要一台 Mac 作为服务器,恰好我有几台 Mac 常年吃灰,可以拿出来当服务器用。在 MacBook 关闭屏幕的时候,网络是会休眠的,这就不能做到 7 × 24 的资讯采集了,所以我下载了个 Amphetamine,用于保持长时间的唤醒。目前经过几天的测试下来,可以正常地进行资讯的采集。
下一步
最后的方法目前只能实时获取到自己已经订阅的公众号消息,若要新订阅一个账号只能自己手动关注,下一步应该会考虑实现转发公众号名片给微信机器人则自动关注的功能。
另外 MacBook Pro 一直运行着当服务器用感觉也不太优雅,之后会考虑购买个 Mac Mini 或是直接迁移到 AWS 的云端 macOS 实例。
目前该系统完成的仅仅是数据仓库的搭建,之后应该会对其进行展示、筛选,结合移动端构造请求采集历史数据,完善数据仓库。之后再实现自动对文章根据指定的标签进行文本分类、自动进行关键词抽取、事件抽取、自动进行文本摘要等。这应该是个系列文章,但下一篇不知道什么时候可以更新。
- 感谢您的赞赏