一个java消息服务组件,可用于改善代码数据流,降低模块之间的耦合,支持异步编程结构。 可以用于网络爬虫,中间件服务器等领域。
03 |
public class ChannelExistException extends RuntimeException{ |
08 |
private static final long serialVersionUID = 5251366244947636179L; |
03 |
public class ChannelNotExistException extends RuntimeException{ |
08 |
private static final long serialVersionUID = 8386851609929711934L; |
03 |
public class ClosedException extends RuntimeException{ |
08 |
private static final long serialVersionUID = 2736603361486637645L; |
3 |
public interface Message { |
3 |
public interface Receiver { |
4 |
public void receive(Message message); |
03 |
import java.util.concurrent.ConcurrentHashMap; |
04 |
import java.util.concurrent.CopyOnWriteArrayList; |
05 |
import java.util.concurrent.CopyOnWriteArraySet; |
06 |
import java.util.concurrent.ExecutorService; |
07 |
import java.util.concurrent.Executors; |
10 |
private boolean isRunning = false; |
12 |
private CopyOnWriteArraySet<Integer> channels = new CopyOnWriteArraySet<Integer>(); |
14 |
private ConcurrentHashMap<Integer, CopyOnWriteArrayList<Receiver>> map = new ConcurrentHashMap<Integer, CopyOnWriteArrayList<Receiver>>(); |
16 |
private ExecutorService executer = Executors.newCachedThreadPool(); |
22 |
public synchronized void start() { |
26 |
public synchronized void close() { |
30 |
public synchronized void openChannel(Integer channel) { |
32 |
throw new ClosedException(); |
34 |
if (channels.contains(channel)) { |
35 |
throw new ChannelExistException(); |
37 |
channels.add(channel); |
38 |
map.put(channel, new CopyOnWriteArrayList<Receiver>()); |
41 |
public synchronized void closeChannel(Integer channel) { |
42 |
channels.remove(channel); |
46 |
public synchronized void add(Integer channel, Receiver receiver) { |
48 |
throw new ClosedException(); |
50 |
if (!channels.contains(channel)) { |
51 |
throw new ChannelNotExistException(); |
53 |
map.get(channel).add(receiver); |
56 |
public synchronized void remove(Integer channel, Receiver receiver) { |
58 |
throw new ClosedException(); |
60 |
if (!channels.contains(channel)) { |
61 |
throw new ChannelNotExistException(); |
63 |
map.get(channel).remove(receiver); |
66 |
public void post(final Integer channel, final Message message) { |
67 |
executer.execute(new Runnable() { |
70 |
CopyOnWriteArrayList<Receiver> list = map.get(channel); |
71 |
for (Receiver r : list) { |