article,

Wechaty 实现 Milvus 社区群消息同步

shiyu22 shiyu22 Follow Jul 20, 2021 · 2 mins read
Wechaty 实现 Milvus 社区群消息同步

作者: shiyu22, 程序媛

Wechaty 实现 Milvus 社区群消息同步

背景

首先介绍下开发背景,Milvus 是一个开源的特征向量数据库,本着开源互助的精神,社区一直致力于帮助用户解疑答惑。企业微信聊天群是社区的交流地之一,基于群聊自动扩群功能下,每个群聊的人数上限是 200 人,所以 Milvus 开源一年半以来,累计了十个用户交流群。用户分散在多个群会发生一些交流或管理方面的问题:

  • 旧群交流少:十个交流群中之前的群中交流不多,因为该踩的坑都踩完了,老用户提问的频率不高。
  • 多个群问相似的技术问题:不同的用户在不同的群里问类似的问题,社区人员需要多次重复回答。
  • 社区重复发布新闻等消息:当产品发布新版本时,社区会发布新闻,这时需要在十个群里手动转发。
  • 技术交流覆盖不广泛:当一个群里交流技术难题,其他群的小伙伴无法参与或查看。 基于以上的这些问题,社区考虑做一个企业微信群间消息同步的机器人,开发群内的消息同步功能,方便群里用户的沟通与交流。

效果展示

针对前面提到的问题,社区计划实现一个可以同步群消息的机器人,之前了解到 Wechaty 可以实现这个功能,并且 Wechaty 的社区也部署了同步群消息的机器人。Wechaty 是一个开源的的对话机器人 SDK,它是一个使用 Typescript 构建的Node.js 应用。支持多种微信接入方案,包括网页,ipad,ios,windows, android 等。 基于 Wechaty 部署的机器人可以同步十个群的消息,效果展示如下面的截图,机器人负责转发其他群的消息,保证每个群可以同步,用户交流群的人就可以跨群交流了。接下来将介绍如何用 Wechaty 部署消息同步的机器人。 效果展示

Wechaty 的具体实现

  • 获取 Milvus 用户交流群的列表 为了同步多个群的消息,首先获取需要同步消息的群列表,这时主要通过群名来判断,如判断群的前缀是否为 ” Milvus 用户交流群“,此时 Wechaty 会返回一个 Room 列表。
async def on_login(self, contact: Contact):
    self.ROOM_LIST = ['Milvus用户交流群']
    roomlist = await self.Room.find_all()
    self.community_list = []
    for r in roomlist:
        name = await r.topic()
        # the group name like 'Milvus用户交流群101'
        if name[:-3] in self.ROOM_LIST:
            self.community_list.append(r)
  • 收到消息时转发到指定群 接下来就判断接受到的消息是否来自 Room 列表,如果是的话就将其转发到剩余的的几个 Room。
async def on_message(self, msg: Message):
    room = msg.room()
    if room:
        contact = msg.talker().name
        text = msg.text()
        group = await room.topic()
    
        # cheeck if the msg is from ROOM_LIST
        if group[:-3] in self.ROOM_LIST:
            # send the msg to other rooms and make sure not duplicated.
            for community in self.community_list:
                if room is community:
                    continue
                community.say(text)
  • 多种类型的信息转发 需要注意的是,群里会接收到多种类型的消息,如“文本”,“图片”,“公众号文章”和“小程序”等,那么可以在 Wechaty 中判断信息类型并发出消息。
msg_type = msg.type()
if msg_type==MessageType.MESSAGE_TYPE_IMAGE:
    image: Image = msg.to_image()
    hd_file_box: FileBox = await image.hd()
    await hd_file_box.to_file(file_path='./hd-image.jpg', overwrite=True)
    image_filebox = FileBox.from_file('./hd-image.jpg')
    await community.say(image_filebox)
elif msg_type==MessageType.MESSAGE_TYPE_URL:
    content = await msg.to_url_link()
    await community.say(content)
elif msg_type == MessageType.MESSAGE_TYPE_MINI_PROGRAM:
    mini_program = await msg.to_mini_program()
     if mini_program:
        await community.say(mini_program)
else:
    if text:
        send_text = '[{}@{}]:  {}'.format(contact, group[-3:], text)
        await community.say(send_text)

总结

最后,十分感谢 Wechaty 开源的 Python SDK 以及开源激励计划,可以十分方便的使用 Token 进行开发。而 Milvus 特征向量数据库同为开源项目,计划在下一版本中利用 Milvus 智能判断广告消息,预防非技术交流人员在群里发布广告等消息,期待下一版本的更新!

Join Newsletter
Get the latest news right in your inbox. We never spam!
Written by shiyu22
程序媛