JGroups组播聊天Chat入门实例

JGroups组播聊天Chat入门实例

🗨

      JGroups是一个开源的纯java编写的可靠的群组通讯工具。其工作模式基于IP多播,但可以在可靠性和群组成员管理上进行扩展。其结构上设计灵活,提供了一种灵活兼容多种协议的协议栈。

      JGroups 多线程的方式实现了多个协议之间的协同工作,常见工作线程有心跳检测,诊断等等。

      JGroups实现多机器之间的通信一般都会包含维护群组状态、群组通信协议、群组数据可靠性传输这样的一些主题。

      JGroups群组的各个节点是存在"管理节点"的,至少可以说某个节点提供了在一段时间内维护状态信息和消息可靠性检测的功能(一般是最先启动的节点)。

    JGroups是一个用于建立可靠的组播通信的工具包(这里指的组播并不一定是IP Multicast,JGroups同样支持使用TCP作为传输协议)。其中可靠性是指通过适当的配置可以保证:消息在传输的过程中不会丢失;所有的接收者以相同的顺序接受所有的消息;原子性:一个消息要么被所有的接收者接收,要么不被任何一个接收者都接收。目前在JBoss Application Server Clustering,OSCache Clustering,Jetty HTTP session replication,  Tomcat HTTP session replication中都使用了JGroups。

  Unreliable Reliable
Unicast UDP TCP
Multicast IP Multicast JGroups

    TCP和UDP是单播(Unicast)协议,也就是说:发送者和每一接收者之间是点对点传输。 如果一个发送者希望向多个接收者传输相同的数据,那么必须相应的复制多份数据。TCP是可靠的传输协议,但UDP不是可靠的,也就是说报文在传输的过程中可能丢失、重复或着乱序,报文的最大尺寸也有限制。IP Multicast可以将消息同时发送到多个接收者。由于IP Multicast是基于UDP的,因此IP Multicast是不可靠的。IP Multicast需要一个特殊的组播地址,它是一组D类IP地址,范围从224.0.0.0 到 239.255.255.255,其中有一部分地址是为特殊的目的保留的。JGroups使用UDP (IP Multicast)、TCP、JMS作为传输协议。JGroups最强大的功能之一是提供了灵活的,可定制的协议栈,以满足不同的需求。例如,如果选择使用IP Multicast作为传输协议,那么为了防止报文丢失和重复,可以在协议栈中添加NAKACK协议;为了保证报文的顺序,可以在协议栈中添加TOTAL协议,以保证FIFO的顺序;为了在组内成员发生变化时得到通知和回调,可以添加Group Membership Service (GMS) 和 FLUSH协议;Failure Detector (FD)协议用于识别组内崩溃的成员;如果新加入的成员希望获得组内其它成员维护的状态,那么可以向协议栈中添加STATE_TRANSFER协议;如果希望对传输的数据进行加密,那么可以使用CRYPT协议等等。

    JGruops的主要功能有:

  • 组的创建和删除。组可以跨越LANs或者WANs。
  • 加入组、主动或者被动(例如当机或者网络故障)离开组。
  • 在组成员改变时,组中其它成员可以得到通知。
  • 向组中的单个或者多个成员发送消息。

   在JGroups中JChannel类提供了主要的API ,用于连接到集群(cluster)、发送和接收消息(Message)和注册listeners等。Message包含消息头(保存地址等信息)和一个字节数组(保存希望传输的数据)。org.jgroups.Address接口及其实现类封装了地址信息,它通常包含IP地址和端口号。连接到集群中的所有实例(instance)被称为一个视图(org.jgroups.View)。通过View.getMembers()可以得到所有实例的地址。实例只有在连接到集群后才能够发送和接收消息。以相同name调用JChannel.connect(String name)方法的所有实例会连接到同一个集群。当实例希望离开集群时,可以调用JChannel.disconnect()方法。当希望释放占有的资源时,可以调用JChannel.close()方法。JChannel.close()方法内部会调用JChannel.disconnect()方法。
    通过调用JChannel.setReceiver()方法可以接收消息和得到View改变的通知。每当有实例加入或者离开集群的时候,viewAccepted(View view)方法会被调用。View.toString()方法会打印出View中所有实例的地址,以及View ID。需要注意的是,每次viewAccepted(View view)方法被调用时,view参数都不同,其View ID也会增长。View内的第一个实例被称为coordinator。Receiver接口上的getState(),setState()方法用于在实例间传递状态。新的实例通过setState()方法获得通过状态,而这个状态是通过调用集群中其它某个实例上的getState()获得的。

 

入门Demo:

 

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.List;

import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;

public class SimpleChat extends ReceiverAdapter {


@Override
public void receive(Message msg) {
        super.receive(msg);
        System.out.println(msg.getSrc() + ": " + msg.getObject());
}
@Override
public void viewAccepted(View view) {
        super.viewAccepted(view);
        System.out.println("** view: " + view.getMembers().get(0).toString());
}


JChannel channel;


public static void main(String[] args) throws Exception {
        new SimpleChat().start(args[0]);
}


private void start(String name) throws Exception {
        channel=new JChannel();
        channel.setReceiver(this);
        channel.setName(name);
        channel.connect("ChatCluster");

        List<Address> addrs = channel.getView().getMembers();
        if("C".equals(name)){
                for(Address addr:addrs){
                       if("B".equals(addr.toString())){
                        channel.send(new Message(addr, null, "hello Jgroup"));
                       }
                }
        }
}


}

 

Chat例子
实现功能
我们来写一个聊天程序,只支持文本的。我们要实现如下功能

  • 所有的SimpleChat实例可以相互找到并组成一个集群。
  • 没必要创建一个中心的ChatServer,这样就不会有单点故障。
  • 聊天消息将被发到集群中的所有实例。
  • 当一个实例加入或退出(或崩溃)时,其他实例都将得到通知。
  • 我们维护一个集群内的聊天记录state。新加入的实例可以查询聊天记录。



代码
代码就是官方的例子,我加入了详细的注释。

Java代码
  1. import java.io.BufferedReader;  
  2. import java.io.DataInputStream;  
  3. import java.io.DataOutputStream;  
  4. import java.io.InputStream;  
  5. import java.io.InputStreamReader;  
  6. import java.io.OutputStream;  
  7. import java.util.LinkedList;  
  8. import java.util.List;  
  9.   
  10. import org.jgroups.JChannel;  
  11. import org.jgroups.Message;  
  12. import org.jgroups.ReceiverAdapter;  
  13. import org.jgroups.View;  
  14. import org.jgroups.util.Util;  
  15.   
  16. public class SimpleChat extends ReceiverAdapter {  
  17.     JChannel channel;  
  18.     String user_name = "ABC";  
  19.     private List<String> state = new LinkedList<String>();  
  20.   
  21.     private void start() throws Exception {  
  22.         channel = new JChannel(); //使用默认配置udp.xml  
  23.         channel.setReceiver(this); //指定Receiver用来收消息和得到View改变的通知  
  24.         channel.connect("ChatCluster"); //连接到集群  
  25.           
  26.         //刚加入集群时,我们通过getState()获取聊天历史记录  
  27.         //getState()的第一个参数代表目的地地址,这里传null代表第一个实例(coordinator)  
  28.         //第二个参数代表等待超时时间,我们等待10秒。如果时间到了,State传递不过来,会抛例外。也可以传0代表永远等下去  
  29.         channel.getState(null10000);  
  30.         eventLoop();  
  31.         channel.close();  
  32.     }  
  33.   
  34.     private void eventLoop() {  
  35.         BufferedReader in = new BufferedReader(new InputStreamReader(System.in));  
  36.   
  37.         while (true) {  
  38.             try {  
  39.                 System.out.print("> ");  
  40.                 System.out.flush();  
  41.                 String line = in.readLine().toLowerCase();  
  42.                 if (line.startsWith("quit") || line.startsWith("exit")) {  
  43.                     break;  
  44.                 }  
  45.                 line = "[" + user_name + "] " + line;  
  46.   
  47.                 //Message构造函数的第一个参数代表目的地地址,这里传null代表要发消息给集群内的所有地址  
  48.                 //第二个参数表示源地址,传null即可,框架会自动赋值  
  49.                 //第三个参数line会被序列化成byte[]然后发送,推荐自己序列化而不是用java自带的序列化  
  50.                 Message msg = new Message(nullnull, line);  
  51.                 channel.send(msg); //发消息到集群  
  52.   
  53.             } catch (Exception e) {  
  54.             }  
  55.         }  
  56.     }  
  57.       
  58.     @Override  
  59.     //每当有实例加入或者离开集群(或崩溃)的时候,viewAccepted方法会被调用  
  60.     public void viewAccepted(View new_view) {  
  61.          System.out.println("** view: " + new_view);   
  62.     }  
  63.   
  64.     @Override  
  65.     //有消息时,byte[]会被反序列化成Message对象,也可以用Message.getBuffer得到byte[]然后自己反序列化。  
  66.     public void receive(Message msg) {  
  67.         String line = msg.getSrc() + ": " + msg.getObject();  
  68.         System.out.println(line);  
  69.         //加入到历史记录  
  70.         synchronized (state) {  
  71.             state.add(line);  
  72.         }  
  73.     }  
  74.       
  75.     @Override  
  76.     public void getState(OutputStream output) throws Exception {  
  77.         //当JChannel.getState()被调用时,某个原来就在集群中的实例的getState会被调用用来得到集群的共享state  
  78.         //Util.objectToStream方法将state序列化为output二进制流   
  79.         synchronized (state) {  
  80.             Util.objectToStream(state, new DataOutputStream(output));   
  81.         }  
  82.     }  
  83.       
  84.     @Override  
  85.     public void setState(InputStream input) throws Exception {  
  86.         //当以上集群的共享state被得到后,新加入集群的实例的setState方法就会被调用了  
  87.         List<String> list = (List<String>) Util.objectFromStream(new DataInputStream(input));  
  88.         synchronized (state) {  
  89.             state.clear();  
  90.             state.addAll(list);  
  91.         }  
  92.   
  93.         System.out.println(list.size() + " messages in chat history):");  
  94.         for (String str : list) {  
  95.             System.out.println(str);  
  96.         }  
  97.     }  
  98.       
  99.     public static void main(String[] args) throws Exception {  
  100.         new SimpleChat().start();  
  101.     }  
  102.   
  103. }  



 功能测试

1.运行该代码3次,开启了3个实例,观察控制台,可以看到每有一个实例加入集群,其他客户端都会得到通知(viewAccepted被调用)。
2.随便哪个客户端发一条消息,其他客户端都能收到这条消息。
3.其中一个客户端输入exit,其他客户端都会得到通知。
4.模拟崩溃,可以杀死某个客户端进程,可以观察到其他客户端可以得到通知。
5.新加入的客户端可以看到聊天历史记录。

 

 监控测试

为了探索jgroups的内在机理,我们用Process Explorer做另一个测试。
先开启第1个SimpleChat
看到第1台机器的53242开始监听


开启第2个SimpleChat
第1台机器的53244和第2台机器的53243建立连接
第2台机器的53245和第1台机器的53242建立连接


开启第3个SimpleChat
第1台机器的53244和第2台机器的53243建立连接
第2台机器的53247和第3台机器的53246建立连接
第3台机器的53248和第1台机器的53242建立连接


开启第4个SimpleChat
第1台机器的53244和第2台机器的53243建立连接
第2台机器的53247和第3台机器的53246建立连接
第3台机器的53250和第4台机器的53249建立连接
第4台机器的53251和第1台机器的53242建立连接


杀死第3个SimpleChat
第1台机器的53244和第2台机器的53243建立连接
第2台机器的53252和第4台机器的53249建立连接
第4台机器的53251和第1台机器的53242建立连接


我们管中窥豹,略微嗅到了jgroups是怎样实现可靠多播的,就是采用一个环将各个节点连接起来(TCP连接)。


当有一个节点崩溃(Client3),这个环会重新连接成一个新的环。图中的蓝线便是为了修补这个环所建立的新的连接。


图中红色的端口是UDP的意思,这个端口负责多播通讯,图中看可出是45588端口,jgroups.jar包默认的udp.xml印证了这一点。

Java代码 
  1. <UDP  
  2.      mcast_port="${jgroups.udp.mcast_port:45588}" />  



jgroups的应用
上面例子程序我们已经可以看到,jgroups可以用来做state replication
以下项目场景都使用了jgroups
JBoss Application Server Clustering
OSCache Clustering
Jetty HTTP session replication
Tomcat HTTP session replication

 


 

 

JGroups的源代码非常赏心悦目,组织非常清晰,有必要深入研究它,不仅仅为了学会JGroups工作原理,而且也能学到诸如项目组织、模块划分之类的项目开发经验。

 

目录结构 - jgroups-3.6.0.Final.jar (二进制发行包)

[-]jgroups-3.6.0.Final
    EncryptKeyStore.xml
    EncryptNoKeyStore.xml
    INSTALL.html
    LICENSE
    README
    auth_X509.xml
    auth_fixedlist.xml
    auth_krb5.xml
    auth_md5.xml
    auth_regex.xml
    auth_simple.xml
    encrypt-entire-message.xml
    encrypt.xml
    execution-service.xml
    fast.xml
    flush-tcp.xml
    flush-udp.xml
    fork-stacks.xml
    fork-stacks.xsd
    jg-magic-map.xml
    jg-messages.properties
    jg-messages_de.properties
    jg-protocol-ids.xml
    jgroups-3.6.xsd
    jgroups.xsd
    log4j2.xml
    mping.xml
    relay.xsd
    relay1.xml
    relay2.xml
    rules.xml
    rules.xsd
    sequencer.xml
    settings.xml
    tcp-nio.xml
    tcp.xml
    tcpgossip.xml
    toa.xml
    tunnel.xml
    udp-largecluster.xml
    udp.xml
    [-]META-INF
        MANIFEST.MF
    [-]org
        [-]jgroups
            Address.class
            AnycastAddress.class
            Channel$State.class
            Channel.class
            ChannelListener.class
            Event.class
            Global.class
            Header.class
            ...
            JChannel.class
            ...

我们会在后面的章节描述这些class文件,本节我们只简单描述这个包里面的XML文件的作用:

  • EncryptKeyStore.xml:包含加密协议的协议栈。可以指定keystore以及keystore访问密码,和配置SSL一样。
  • EncryptNoKeyStore.xml:包含加密协议的协议栈。非对称算法为RSA,对称算法为"Blowfish"。(自动生成临时keystore?)
  • INSTALL.html:如何使用JGroups以及运行其中的演示例子。
  • LICENSE:许可证为 "Apache License, Version 2.0"。
  • README:JGroups介绍。
  • auth_X509.xml:包含含身份认证协议的协议栈。使用标准的X509技术,设置keystore、加密类型(如:RSA)等等。
  • auth_fixedlist.xml:包含含身份认证协议的协议栈。列举出固定的受信任的集群成员。
  • auth_krb5.xml:包含含身份认证协议的协议栈。用Kerboras技术认证。
  • auth_md5.xml:包含含身份认证协议的协议栈。用MD5哈希算法对认证口令计算哈希值,再通过网络传输哈希值,供集群成员相互认证。
  • auth_regex.xml:包含含身份认证协议的协议栈。只接受某个正则表达式所描述的IP地址范围。
  • auth_simple.xml:包含含身份认证协议的协议栈。简单口令认证。
  • encrypt-entire-message.xml:包含加密协议的协议栈。非对称算法为RSA,对称算法为"AES/ECB/PKCS5Padding"。(自动生成临时keystore?)
  • encrypt.xml:包含加密协议的协议栈。非对称算法为RSA,对称算法为"AES/ECB/PKCS5Padding"。(自动生成临时keystore?)
  • execution-service.xml:包含CENTRAL_EXECUTOR协议的协议栈。供ExecutionService使用。
  • fast.xml:快速配置的协议栈,供本地模式使用,也就是说所有集群成员位于相同的主机上。
  • flush-tcp.xml:基于TCP的协议栈。包含flush、流控和消息绑定协议。常用于IP多播被禁用的情况。
  • flush-udp.xml:基于UDP的协议栈。包含flush协议。
  • fork-stacks.xml:分叉协议栈。<fork-stacks/>
  • fork-stacks.xsd:分叉协议栈的语法。
  • jg-magic-map.xml:魔数映射表。比如,1对应"org.jgroups.stack.IpAddress",31对应"org.jgroups.View"。应用场合之一:为消息打标签。
  • jg-messages.properties:国际化文件,英文。
  • jg-messages_de.properties:国际化文件,德文。
  • jg-protocol-ids.xml:JGroups各官方协议ID。所有协议的ID必须唯一,客户化协议不要和这些ID重复。
  • jgroups-3.6.xsd:JGroups 3.6 协议栈配置语法。
  • jgroups.xsd:JGroups 协议栈配置语法(和jgroups-3.6.xsd相同)。
  • log4j2.xml:log4j配置。
  • mping.xml:基于TCP的协议栈。包含MPING协议。
  • relay.xsd:RelayConfigurationType的语法。
  • relay1.xml:RelayConfiguration协议,开发之中,尽量不要使用。
  • relay2.xml:RelayConfiguration协议,开发之中,尽量不要使用。
  • rules.xml:rules配置。
  • rules.xsd:rules配置的语法。
  • sequencer.xml:基于UDP的协议栈。包含SEQUENCER协议,将所有消息和视图进行排序。
  • settings.xml:JGroups Maven settings.xml 示例文件。
  • tcp-nio.xml:基于TCP_NIO的协议栈。包含flush、流控和消息绑定协议。常用于IP多播被禁用的情况。
  • tcp.xml:基于TCP的协议栈。包含flush、流控和消息绑定协议。常用于IP多播被禁用的情况。
  • tcpgossip.xml:基于TCP的协议栈。包含TCPGOSSIP协议。
  • toa.xml:基于UDP的协议栈。包含TOA协议。
  • tunnel.xml:基于TUNNEL的协议栈。
  • udp-largecluster.xml:基于UDP的协议栈。用于大型集群中。该配置文件还在开发之中。
  • udp.xml:基于UDP的协议栈。这是JGroups默认使用的协议栈。

注意:

  • 大多数XML文件的顶部注解都描述了这个文件的作用,不过有些注解明显是拷贝过来的,不匹配。
  • JGroups的协议栈处理方式为:"底层传输协议"将报文向上递交,每经过一些协议都会被处理一次,直到最后被交给应用程序。所以,身份认证协议以及加密协议通常位于传输协议之上,GMS(分组成员关系)协议之下;越靠近传输协议,那么被加密的包头就越多。

频道:Java
扫描本文章二维码可手机访问: