站长资讯网
最全最丰富的资讯网站

内嵌消息派发服务器

一个java消息服务组件,可用于改善代码数据流,降低模块之间的耦合,支持异步编程结构。 可以用于网络爬虫,中间件服务器等领域。

01 package blackstar;
02  
03 public class ChannelExistException  extends RuntimeException{
04  
05     /**
06      *
07      */
08     private static final long serialVersionUID = 5251366244947636179L;
09  
10 }

 
01 package blackstar;
02  
03 public class ChannelNotExistException extends RuntimeException{
04  
05     /**
06      *
07      */
08     private static final long serialVersionUID = 8386851609929711934L;
09  
10 }

 
01 package blackstar;
02  
03 public class ClosedException  extends RuntimeException{
04  
05     /**
06      *
07      */
08     private static final long serialVersionUID = 2736603361486637645L;
09  
10 }

 
1 package blackstar;
2  
3 public interface Message {
4  
5 }
 

1 package blackstar;
2  
3 public interface Receiver {
4     public void receive(Message message);
5 }

 
01 package blackstar;
02  
03 import java.util.concurrent.ConcurrentHashMap;
04 import java.util.concurrent.CopyOnWriteArrayList;
05 import java.util.concurrent.CopyOnWriteArraySet;
06 import java.util.concurrent.ExecutorService;
07 import java.util.concurrent.Executors;
08  
09 public class Server {
10     private boolean isRunning = false;
11  
12     private CopyOnWriteArraySet<Integer> channels = new CopyOnWriteArraySet<Integer>();
13  
14     private ConcurrentHashMap<Integer, CopyOnWriteArrayList<Receiver>> map = new ConcurrentHashMap<Integer, CopyOnWriteArrayList<Receiver>>();
15  
16     private ExecutorService executer = Executors.newCachedThreadPool();
17  
18     public Server() {
19  
20     }
21  
22     public synchronized void start() {
23         isRunning = true;
24     }
25  
26     public synchronized void close() {
27         isRunning = false;
28     }
29  
30     public synchronized void openChannel(Integer channel) {
31         if (!isRunning) {
32             throw new ClosedException();
33         }
34         if (channels.contains(channel)) {
35             throw new ChannelExistException();
36         }
37         channels.add(channel);
38         map.put(channel, new CopyOnWriteArrayList<Receiver>());
39     }
40  
41     public synchronized void closeChannel(Integer channel) {
42         channels.remove(channel);
43         map.remove(channel);
44     }
45  
46     public synchronized void add(Integer channel, Receiver receiver) {
47         if (!isRunning) {
48             throw new ClosedException();
49         }
50         if (!channels.contains(channel)) {
51             throw new ChannelNotExistException();
52         }
53         map.get(channel).add(receiver);
54     }
55  
56     public synchronized void remove(Integer channel, Receiver receiver) {
57         if (!isRunning) {
58             throw new ClosedException();
59         }
60         if (!channels.contains(channel)) {
61             throw new ChannelNotExistException();
62         }
63         map.get(channel).remove(receiver);
64     }
65  
66     public void post(final Integer channel, final Message message) {
67         executer.execute(new Runnable() {
68             @Override
69             public void run() {
70                 CopyOnWriteArrayList<Receiver> list = map.get(channel);
71                 for (Receiver r : list) {
72                     r.receive(message);
73                 }
74             }
75         });
76     }
77 }

 

赞(0)
分享到: 更多 (0)

网站地图   沪ICP备18035694号-2    沪公网安备31011702889846号