简单用socket.io撸个聊天系统(1)——服务端

去年有个私活,大概是做一个物品交易系统,其中交易界面很类似聊天界面,甲方的明确需求是不允许用第三方IM接入,不过第三方IM确实寄人篱下,每年要付费而且这个交易系统需要一定安全性,而其实这个所谓的聊天系统也没那么复杂,不是发文字图片视频语音那种传统聊天需求,其实只是发一些特殊的JSON,客户端解析成一个个交易小框框而已,不过路是一样的,把“特殊的JSON”换成文字图片也是一样的。


想了想,还是从头到尾动手撸一个聊天系统比较好,当然也不能完全造轮子,客户端是uniapp,高端版套壳浏览器,所以核心还是websocket那套玩意,基于这之上的轮子比较阳间的就是socket.io,于是整个架构的技术选型就确定了。

服务器:netty socketio

客户端:uniapp,由于uniapp里的websocket不是浏览器那套,所以直接用socket.io的js是不行的,不过好在有人封装了一套轮子,弄了个中间层,让uniapp也能用上socket.io,就是这个→@hyoga/uni-socket.io,实测还是挺好用暂时没看到什么bug,可以用在正式工程里。

服务器的语言是groovy,缩写版java,语法大致差不多,就算没学过也能差不多看懂的程度。

在正式写代码前我们得先想一个事情,我们对消息的可靠性要求是多少?我们要根据项目需求来制定一个软件的架构。

是发过去就不管了,还是要客户端“确认收到(ack)”一次?是无论在线离线都发一下,发不出去无所谓,还是要保证消息不允许丢,拥有个离线存储功能,当客户端上线后去把之前存起来的消息发出去等等。

不巧,这个项目什么都需要,消息要保证最可靠,而且离线消息也要有保存。

那首先我们先解决消息存储的问题,在netty-sockio的github页面上,可以看到支持三种客户端存储方式(Memory, RedissonHazelcast),不巧当时比较着急hhh,并没有太仔细看文档,没有看到有多个消息存储,只以为是无消息存储或暂存在内存里,所以手动写了个消息存在redis里的轮子,好在代码其实也不多hhh

我们先制定个消息格式,在redis里我们用 hash类型 来存我们的消息,其中hash的value是我们的消息,消息可以是字符串,可以是JSON或者任何东西,看你的具体需求,而value对应的key是一串随机不重复的字符串,用来标识这条消息,方便我们的“确认收到(ack)”代码的设计,这个后面再讲。

而对应redis里的key就是我们的用户ID,总的来说就是我们给每个UID分配一个hash表来存消息,弄清关系后,我们就可以先写一个消息存储类了,用来管理消息

import groovy.transform.CompileStatic
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.stereotype.Component
import team.rpsg.vitem.enums.MessageType
import team.rpsg.vitem.util.Token

import java.util.concurrent.TimeUnit

@CompileStatic
@Component
class Message {

   @Autowired
   RedisTemplate<String, Object> redis

   @Autowired
   SocketService socket //这个类是我们接下来要实现的,用来收发消息

   final static int EXPIRE_DAY = 180 //指定个消息过期时间,过期从redis里删除


   /**
    * 获取某个UID堆积的全部消息
    */
   List<Map> total(long roleId) {
      def total = redis.boundHashOps(key(roleId))

      def result = []

      for(def key in total.keys())
         result << [key: key, value: total.get(key)]

      return result
   }


   /**
    * 给两个人发消息(因为只有一对一聊天,如果需要多个聊天,可以参考socket.io的room实现,用room进行群发)
    */
   void sendDual(long senderRole, long receiverRole, String message) {
      send(senderRole, receiverRole, message)
      send(receiverRole, senderRole, message)
   }


   /**
    * 给某个人发消息,发送者、接受者和消息,
    * 注意客户端可能未在线,但无论如何我们先把消息存到redis里再说,然后尝试发消息
    */
   void send(long senderRole, long receiverRole, String message) {
      def total = redis.boundHashOps(key(receiverRole))

      //其中Token.get()就是返回一个随机不重复的UUID,可以自行设计
      total.put(senderRole + "|" + Token.get(), message)
      total.expire(EXPIRE_DAY, TimeUnit.DAYS)

      //尝试发消息,发不发的出去得看客户端是否在线,具体sendTotal方法在下方有设计
      socket.sendTotal(roleId, total(roleId))

   }



   static String key(long id1){
      "vi_msg:${id1}"
   }


   /**
    * 清除消息,当客户端“确认收到”后我们把消息从redis里删掉以免下次重复发送。
    */
   void clear(long roleId, List<String> keys = null) {
      if(keys == null) {
         redis.delete(key(roleId))
      } else {
         def total = redis.boundHashOps(key(roleId))
         total.delete(keys.stream().toArray(String[]::new))
      }
   }
}

项目是没有聊天记录需求的,聊天记录只存在手机本地,删掉就彻底没有了,不过因为消息是我们“管控”的,所以在这之上加个聊天记录也是没问题的,我们把clear方法改成逻辑删除,同时total方法我们读取redis里的聊天记录时避开那些被删除的聊天记录即可。(但是注意redis的容量太大导致性能问题,我们可以在clear里把聊天记录转移到传统数据库里进行冷藏,读取聊天记录时从数据库里读取即可。)

那么接下来我们要正式接入socketio框架了,在gradle里加入引用(maven同理)

compile group: 'com.corundumstudio.socketio', name: 'netty-socketio', version: '1.7.18'

在写之前我们还需写一个配置类,来自动初始化一个SocketIO对象。

@CompileStatic
@Configuration
class SocketIOConfig {

   //从spring配置文件里读取,或者写死,随意
   @Value("\${socket.host}")
   String host

   @Value("\${socket.port}")
   Integer port


   // 注意这是groovy语法,java就用configuration.setXXXX()方法来设置即可。
   @Bean
   SocketIOServer socketIOServer() {
      return new SocketIOServer(new com.corundumstudio.socketio.Configuration(
            socketConfig: new SocketConfig(
                  tcpNoDelay: true, //开启即可,详细查文档
                  soLinger: 0
            ),
            hostname: host, //主机,一般为0.0.0.0
            port: port, //端口,随意,这里是10246
            bossThreads: 1, //主线程数
            workerThreads: 100, //线程数
            allowCustomRequests: true, 
            upgradeTimeout: 100000, //升级时间,和socket.io的协议升级有关,详情查文档
            pingTimeout: 60000, //ping超时时间,单位ms,socket.io会内置一个ping功能,定时往服务器发一条“ping”消息,服务器会返回一条“pong”代表收到,如果这个ping超时则认为socket已关闭。
            pingInterval: 25000 //ping频率,单位ms,注意必须比pingTimeout小
      ))
   }
}

我们建立一个SocketService类,并注册为Spring的Service。

我们依赖的Spring的@PostConstruct@PreDestroy注解,把他们挂到两个方法上,前者是该类被初始化后调用,后者是系统服务关闭前被调用,最终代码如下。

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.springframework.stereotype.Service

@Service
@CompileStatic
@Slf4j
class SocketService {

   @Autowired
   SocketIOServer socket //引用socketIO

   @Autowired
   Message message

   @PostConstruct
   start() {
      //启动socket
   }

   @PreDestroy
   stop() {
      if (socket)
         socket.stop()
   }

}

那么一步一步来,我们开始写start方法。

客户端连上我们的我们要给socket变量添加两个监听器,connect和disconnect,socket不能说连就连,我们得有一个auth过程,这个就在connect里实现。

//客户端连接时需要带上query,如192.168.0.56/socket.io?token=123,我们写个辅助方法来从连接里获取某个query
private static String getClientParam(SocketIOClient client, String param) {
   if(!client)
      return null

   def tokens = client.handshakeData.urlParams.get(param)

   if(!tokens)
      return null

   return tokens[0]
}
//在start()方法内写下

socket.addConnectListener { SocketIOClient it ->
   def token = getClientParam(it, "token")
   if(!token) { //如果客户端query里不带token则关闭连接。
      it.disconnect()
      return
   }

   def user = auth.of(token)
   def roleId = roleId(it)

   //验证token,如果失败就关掉这个连接。这里自行替换验证逻辑即可。
   if(!user || user.expired || user.currentRoleId != roleId) {
      it.disconnect()
      return
   }

   //把房间名为UID内的其他成员踢掉,下面讲
   socket.getRoomOperations(roleId as String).disconnect()
   //加入房间,房间号为UID
   it.joinRoom(roleId as String)

   //当连接成功后,从message里读取离线的消息并发送给客户端。其中message为刚才实现的消息暂存类。
   def totalMessage = message.total(user.currentRoleId)
   sendTotal(user.currentRoleId, totalMessage)
}

//最后别忘记start服务。
socket.start()

在国内的java socket.io文章里可以看到他们喜欢用一个静态Map对象来存放当前系统的全部客户端成员,但是这样是有很大的隐患,首先我们一定要记住,聊天系统是你爹,你驾驭不住的。即便是socket.io也是会不靠谱抽风的,网络波动、服务器即便断开连接但客户端还在连接或反过来等各种原因都是可能有的,所以我们完全没必要脱裤子放屁自己维护一个客户端集合的状态,很有可能在这个Map里找不到对应成员,或者对应成员在线状态有异常,既不靠谱还有可能出未知问题。

socket.io拥有room设计,顾名思义就是房间,我们巧妙的利用room系统,把每个UID放到一个房间里,房间里只有他自己,而我们想查询某个用户是否在线,只要看看房间里人数是否>=1即可,同样的我们想给某个特定用户发消息就对这个房间发消息即可。

而想知道在线人数也很简单,完全没必要自己维护一个计数器,直接获取全部客户端数量就可以了。

socket.getAllClients().size()

作为码农我们要记住把事情简单化,别overengineering,有时候写一堆废话代码反而费力不讨好。

接下来我们注册一个事件,就取名为eat,客户端接收到服务器消息,并且处理完成后,向服务端发送一个eat事件,内容为处理完成的消息ID,这个消息ID其实就是我们刚才说到的redis hash表的消息key。我们拿到消息ID后就调用刚才的message.clear来删除已读消息。

socket.addEventListener("eat", String.class, (SocketIOClient client, String data, AckRequest ackSender) -> {
   //这里是本项目的逻辑,
   //简单来说就是客户端发一个/多个消息ID,服务器解析后调用message.clear方法清除redis里的消息,
   //至于data是JSON还是字符串逗号隔开都可以自行设计。
   def json = JSON.toMap(data)

   if(json.type == "total") {
      message.clear(roleId(client), json.values as List<String>)
   }
})


我们在实现一个方法,用来判断某个用户是否在线。

根据我们刚才的room设计,其实判断名为UID的房间里客户端数量就可以了。

boolean hasClient(long roleId) {
   return socket.getRoomOperations(roleId as String).clients.size() > 0
}


好,我们已经接近真理了,就差最后一步,给某个客户端发送消息,其实我们拆开两个类来做,Message类负责对接redis进行消息存储,而SocketService类负责对接socket.io客户端进行消息收发,那在SocketService这里我们“给某个客户端发消息”,其实就是“把堆积的消息读出来发给客户端”,我们不能保证这个方法的消息是否真的发到客户端了,因为客户端可能离线,所以这个方法可以理解为“尝试给某个客户端发消息”。

//给client发消息,消息为List<Map>类型,Map参考Message.send()
//返回是否发送成功
boolean sendTotal(long roleId, List<Map> message) {

   //如果消息为空或者客户端离线,那就取消发送。
   if(!hasClient(roleId) || message.isEmpty()) {
      return false
   }

   //通过房间获取client
   def room = socket.getRoomOperations(roleId as String)

   //发送消息,事件名随便起一个,这里叫total,客户端对接相应事件名即可。
   room.sendEvent("total", JSON.stringify(message))
}


到此为止我们的简单但实用的服务端代码就算完成了,但注意这个sendTotal方法不是给你直接调用的,而是给message.send()用的,我们如果想给某个用户发消息,需要调用message.send(),它首先负责把消息存在redis,然后调用本方法尝试给客户端发消息,如果客户端不在线就不发送,没关系,客户端主动上线时会触发我们的connect方法,我们在那里也写了一个sendTotal方法来把堆积消息发给客户端,最后客户端收到消息并处理后,在调用eat事件,我们服务端在redis把消息删除,一切就是这么的无懈可击……

……但真的是这样吗,其实服务端代码确实到此为止了倒不用怕,但是还是那句话,聊天系统是你爹,你驾驭不住的。真正恶心人的是客户端,我们下一章开始在uniapp上写一个客户端,同时体验并解决一下消息抽风、网络波动、重发、明明注册一个客户端最后抽风出多个客户端等问题。

One Response to “ 简单用socket.io撸个聊天系统(1)——服务端 ”

  1. 有个aio的通信框架好像很厉害

发表评论