@ TOC
项目现状目前,常见的IM系统传输报文无外乎使用UDP、TCP以及应用层的HTTP这几种协议。市面上象微信、MSN、陌陌、米聊、环信等大多采用TCP协议,只有QQ比较特殊,采用了UDP协议,应该是历史原因造成的,可能与当时的网络条件和初始资源有关。
UDP协议UDP协议提供了一种不可靠的无连接数据包传输服务。它不提供报文到达的确认、排序、及流量控制等功能。本身设计比较简洁,数据包较小,无需确认等特点,所以传输效率极高,比较适合应于流媒体类型的业务,这些业务对于少量数据包的损失不敏感。但对于IM系统来讲,对数据的完整性要求高、传输有序,直接使用UDP协议就不合适。如要使用就必须在UDP协议基础上再增加校验、重发机制。
TCP协议TCP协议提供了一种可靠的有连接数据包传输服务。它解决了UDP协议对于报文到达的确认、排序、及流量控制等方面的缺陷,能够保证可靠地数据传输。但它的缺点是传输效率要比UDP低,对于服务端大量数据的处理,需要耗费较高的资源,尤其是在长连接情况下,如何保证单机服务器高并发量,如何灵活地进行水平扩展都需要做好设计。
HTTP协议有的IM采用了TCP之上的应用层HTTP协议的传输,但一般对接的是Web客户端。还有的是作为一项辅助功能,提供第三方访问的web接口。
结论我们的IM产品将采用TCP传输协议收发报文,以后会增加HTTP协议作为开放接口。
以下的测试工作完全基于golang代码进行的测试。为什么使用golang而不是c/c++或者java呢?选择golang主要基于以下因素的考虑:
对于IMServer来讲,首先满足单服务器能够抗住足够多的并发的Tcp长连接。golang 底层通讯模型,windows默认使用了IOCP机制,linux默认使用了epoll模型,均能抗住高并发。
IMClient根据日常使用场景,设计了每1分钟发送500个字节数据,后接收server推送的500个字节数据,以此循环进行,并保持长连接。IMServer接收数据,并原样回送数据。测试环境中,Server使用了一台低配PC,2核CPU,8G内存,操作系统为centos7/Windows。
每个客户端开启了6000个连接,为了模拟更多的并发,同时开启了8-10个客户端,模拟5万长连接用户使用IM服务,进行压力测试。
当所有连接正常开启以后,用telnet单独连接服务器,发送数据和回送数据依然正常,说明在当前并发情况下,系统正常运行,符合预期。
单台低配服务器达到5万已经足够,无需接受更多的长连接。如果用户超过5万长连接,可以采用水平扩展方式,加入更多的服务器,进行负载均衡即可。
单机在linux统计长连接总数: 加载到5w长连接后,使用telnet连接服务器,查看服务情况,依然正常接受请求,回送数据:
客户端代码 im_client.go
package main import ( "fmt" "net" "time" "strconv" ) const ( //根据自身情况修改服务器ip地址 serverAddr = "192.168.1.36:120" ) func main() { succCount := 0 failCount := 0 for i:=0; i<6000; i++ { conn, err := net.Dial("tcp", serverAddr) if err != nil { failCount++ fmt.Println("Number of dialing failures:", failCount) time.Sleep(time.Second) continue } defer conn.Close() fmt.Println("Number of successful connections:", succCount ) go Client(conn, succCount ) succCount ++ } //Stop here to prevent the program from exiting tick := time.NewTicker(10 * time.Second) for { select { case <-tick.C: } } } func Client(conn net.Conn, index int) error { dataRead := make([]byte, 512) dataWrite := strconv.Itoa(index) + "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890/n" + "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890/n" + "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890/n" + "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890/n" + "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890/n" for { conn.Write([]byte(dataWrite)) _, err := conn.Read(dataRead) if err != nil { return err } time.Sleep(60*time.Second) } return nil }
linux下运行前,必须重新设置可打开的最大文件句柄数,默认为1024,也就是最大只能接受1024个连接数。我们设置为65330:
服务器端代码 im_server.go
package main import ( "net" "log" "time" ) const ( SERVER_IP = "" SERVER_PORT = "120" ) type Server struct { } func (server *Server) Start() { serverAddr := SERVER_IP + ":" + SERVER_PORT listener, err := net.Listen("tcp", serverAddr) if err != nil { log.Println("Failed to execute function net.Listen: ", err.Error()) return } defer listener.Close() for { conn, err := listener.Accept() if err != nil { if nerr, ok := err.(net.Error); ok && nerr.Temporary() { log.Println("Continue accepting connection because of an temporary error: ", err.Error()) time.Sleep(1 *time.Second) continue } log.Println("Failed to execute function listener.Accept, exit app because of an error: ", err.Error()) return } sess := &Session { Conn: conn, } go sess.Serve() } } func main() { (&Server{}).Start() }
服务器端代码 im_session.go
package main import ( "bufio" "net" "log" "fmt" ) type Session struct { Conn net.Conn Reader *bufio.Reader Writer *bufio.Writer } func (session *Session) Serve() { defer session.Close() session.Reader = bufio.NewReader(session.Conn) session.Writer = bufio.NewWriter(session.Conn) for { byteData, err := session.Reader.ReadSlice('/n') if err != nil { log.Println("Error occured in funciton Session.Serve: ",err.Error() ) return } line := string(byteData) fmt.Println(line) session.Send(line) } } func (session *Session) Close() { session.Conn.Close() } func (session *Session) write(data string) error { if _, err := fmt.Fprint(session.Writer, data); err != nil { return err } return session.Writer.Flush() } func (session *Session) Send(data string) error { return session.write(data) }