打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Redis编程实践【pub/sub】

    Redis或许已经在很多企业开始推广并试水,本文也根据个人的实践,简单描述一下Redis在实际开发过程中的使用(部署与架构,稍后介绍),程序执行环境为java + jedis,关于spring下如何集成redis-api,稍后介绍吧。

 

前言:下载redis-2.6.2,安装好redis之后,请在redis.conf文件中,将如下3个配置属性开启(仅供测试使用):

    ##客户端链接的端口,也是server端侦听client链接的端口      ##每个client实例,都将和server在此端口上建立tcp长链接      port 6379      ## server端绑定的ip地址,如果一个物理机器有多个网络接口时,可以明确指定为某个网口的ip地址      bind 127.0.0.1      ##链接中io操作空闲时间,如果在指定时间内,没有IO操作,链接将会被关闭      ##此属性和TCP链接中的timeout选项一样,建议设置为0,很多时候,我们一个应用也只会有一个redis实例      ##不过,如果你使用连接池的话,你需要对此参数做额外的考虑。      timeout 0  

 

Pub/Sub: "发布/订阅",对于此功能,我们将会想到很多JMS实现,Redis提供此功能显的“多此一举”;不过这个功能在redis中,被设计的非常轻量级和简洁,它做到了消息的“发布”和“订阅”的基本能力,但是尚未提供JMS中关于消息的持久化/耐久性等各种企业级的特性。

    一个Redis client发布消息,其他多个redis client订阅消息,发布的消息“即发即失”,redis不会持久保存发布的消息;消息订阅者也将只能得到订阅之后的消息,通道中此前的消息将无从获得。这就类似于JMS中“非持久”类型的消息。

    消息发布者,即publish客户端,无需独占链接,你可以在publish消息的同时,使用同一个redis-client链接进行其他操作(例如:INCR等)

    消息订阅者,即subscribe客户端,需要独占链接,即进行subscribe期间,redis-client无法穿插其他操作,此时client以阻塞的方式等待“publish端”的消息;这一点很好理解,因此subscribe端需要使用单独的链接,甚至需要在额外的线程中使用。

    一旦subscribe端断开链接,将会失去部分消息,如果你非常关注每个消息,那么你应该考虑使用JMS或者基于Redis做一些额外的补充工作,如果你期望订阅是持久的,那么如下的设计思路可以借鉴(如下原理基于JMS):

    1) subscribe端首先向一个Set集合中增加“订阅者ID”,此Set集合保存了“活跃订阅”者,订阅者ID标记每个唯一的订阅者,例如:sub:email,sub:web。此SET称为“活跃订阅者集合”

    2) subcribe端开启订阅操作,并基于Redis创建一个以“订阅者ID”为KEY的LIST数据结构,此LIST中存储了所有的尚未消费的消息。此LIST称为“订阅者消息队列”

    3) publish端:每发布一条消息之后,publish端都需要遍历“活跃订阅者集合”,并依次向每个“订阅者消息队列”尾部追加此次发布的消息。

    4) 到此为止,我们可以基本保证,发布的每一条消息,都会持久保存在每个“订阅者消息队列”中。

    5) subscribe端,每收到一个订阅消息,在消费之后,必须删除自己的“订阅者消息队列”头部的一条记录。

    6) subscribe端启动时,如果发现自己的自己的“订阅者消息队列”有残存记录,那么将会首先消费这些记录,然后再去订阅。

 

--------------------------------------------------------------非持久化订阅-------------------------------------------------------

PrintListener.java:订阅者消息处理器

public class PrintListener extends JedisPubSub{	@Override	public void onMessage(String channel, String message) {		String time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");		System.out.println("message receive:" + message + ",channel:" + channel + "..." + time);		//此处我们可以取消订阅		if(message.equalsIgnoreCase("quit")){			this.unsubscribe(channel);		}	}...}
 

PubClient.java:消息发布端

public class PubClient {	private Jedis jedis;//	public PubClient(String host,int port){		jedis = new Jedis(host,port);	}		public void pub(String channel,String message){		jedis.publish(channel, message);	}		public void close(String channel){		jedis.publish(channel, "quit");		jedis.del(channel);//	}}

 

SubClient.java:消息订阅端

public class SubClient {	private Jedis jedis;//		public SubClient(String host,int port){		jedis = new Jedis(host,port);	}		public void sub(JedisPubSub listener,String channel){		jedis.subscribe(listener, channel);		//此处将会阻塞,在client代码级别为JedisPubSub在处理消息时,将会“独占”链接		//并且采取了while循环的方式,侦听订阅的消息		//	}}

 

PubSubTestMain.java:测试引导类

public class PubSubTestMain {	/**	 * @param args	 */	public static void main(String[] args) throws Exception{		PubClient pubClient = new PubClient(Constants.host, Constants.port);		final String channel = "pubsub-channel";		pubClient.pub(channel, "before1");		pubClient.pub(channel, "before2");		Thread.sleep(2000);		//消息订阅着非常特殊,需要独占链接,因此我们需要为它创建新的链接;		//此外,jedis客户端的实现也保证了“链接独占”的特性,sub方法将一直阻塞,		//直到调用listener.unsubscribe方法		Thread subThread = new Thread(new Runnable() {			@Override			public void run() {				try{					SubClient subClient = new SubClient(Constants.host, Constants.port);					System.out.println("----------subscribe operation begin-------");					JedisPubSub listener = new PrintListener();					//在API级别,此处为轮询操作,直到unsubscribe调用,才会返回					subClient.sub(listener, channel);					System.out.println("----------subscribe operation end-------");				}catch(Exception e){					e.printStackTrace();				}							}		});		subThread.start();		int i=0;		while(i < 10){			String message = RandomStringUtils.random(6, true, true);//apache-commons			pubClient.pub(channel, message);			i++;			Thread.sleep(1000);		}		//被动关闭指示,如果通道中,消息发布者确定通道需要关闭,那么就发送一个“quit”		//那么在listener.onMessage()中接收到“quit”时,其他订阅client将执行“unsubscribe”操作。		pubClient.close(channel);		//此外,你还可以这样取消订阅		//listener.unsubscribe(channel);	}}

 

--------------------------------------------------------------持久化订阅-------------------------------------------------------

 

PPrintListener.java

public class PPrintListener extends JedisPubSub{	private String clientId;	private PSubHandler handler;		public PPrintListener(String clientId,Jedis jedis){		this.clientId = clientId;		handler = new PSubHandler(jedis);	}		@Override	public void onMessage(String channel, String message) {		//此处我们可以取消订阅		if(message.equalsIgnoreCase("quit")){			this.unsubscribe(channel);		}		handler.handle(channel, message);	}		private void message(String channel,String message){		String time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");		System.out.println("message receive:" + message + ",channel:" + channel + "..." + time);	}	@Override	public void onPMessage(String pattern, String channel, String message) {		System.out.println("message receive:" + message + ",pattern channel:" + channel);			}	@Override	public void onSubscribe(String channel, int subscribedChannels) {		handler.subscribe(channel);		System.out.println("subscribe:" + channel + ";total channels : " + subscribedChannels);			}	@Override	public void onUnsubscribe(String channel, int subscribedChannels) {		handler.unsubscribe(channel);		System.out.println("unsubscribe:" + channel + ";total channels : " + subscribedChannels);			}	@Override	public void onPUnsubscribe(String pattern, int subscribedChannels) {		System.out.println("unsubscribe pattern:" + pattern + ";total channels : " + subscribedChannels);			}	@Override	public void onPSubscribe(String pattern, int subscribedChannels) {		System.out.println("subscribe pattern:" + pattern + ";total channels : " + subscribedChannels);			}		@Override	public void unsubscribe(String... channels) {        super.unsubscribe(channels);        for(String channel : channels){        	handler.unsubscribe(channel);        }    }		class PSubHandler {		private Jedis jedis;		PSubHandler(Jedis jedis){			this.jedis = jedis;		}		public void handle(String channel,String message){			int index = message.indexOf("/");			if(index < 0){				return;			}			Long txid = Long.valueOf(message.substring(0,index));			String key = clientId + "/" + channel;			while(true){					String lm = jedis.lindex(key, 0);//获取第一个消息					if(lm == null){						break;					}					int li = lm.indexOf("/");					//如果消息不合法,删除并处理					if(li < 0){						String result = jedis.lpop(key);//删除当前message						//为空						if(result == null){							break;						}						message(channel, lm);						continue;					}					Long lxid = Long.valueOf(lm.substring(0,li));//获取消息的txid					//直接消费txid之前的残留消息					if(txid >= lxid){						jedis.lpop(key);//删除当前message						message(channel, lm);						continue;					}else{						break;					}			}		}				public void subscribe(String channel){			String key = clientId + "/" + channel;			boolean exist = jedis.sismember(Constants.SUBSCRIBE_CENTER,key);			if(!exist){				jedis.sadd(Constants.SUBSCRIBE_CENTER, key);			}		}				public void unsubscribe(String channel){			String key = clientId + "/" + channel;			jedis.srem(Constants.SUBSCRIBE_CENTER, key);//从“活跃订阅者”集合中删除			jedis.del(key);//删除“订阅者消息队列”		}	}}

 

 

PPubClient.java

public class PPubClient {	private Jedis jedis;//	public PPubClient(String host,int port){		jedis = new Jedis(host,port);	}		/**	 * 发布的每条消息,都需要在“订阅者消息队列”中持久	 * @param message	 */	private void put(String message){		//期望这个集合不要太大		Set<String> subClients = jedis.smembers(Constants.SUBSCRIBE_CENTER);		for(String clientKey : subClients){			jedis.rpush(clientKey, message);		}	}		public void pub(String channel,String message){		//每个消息,都有具有一个全局唯一的id		//txid为了防止订阅端在数据处理时“乱序”,这就要求订阅者需要解析message		Long txid = jedis.incr(Constants.MESSAGE_TXID);		String content = txid + "/" + message;		//非事务		this.put(content);		jedis.publish(channel, content);//为每个消息设定id,最终消息格式1000/messageContent			}		public void close(String channel){		jedis.publish(channel, "quit");		jedis.del(channel);//删除	}		public void test(){		jedis.set("pub-block", "15");		String tmp = jedis.get("pub-block");		System.out.println("TEST:" + tmp);	}}

 

PPSubClient.java

public class PSubClient {	private Jedis jedis;//	private JedisPubSub listener;//单listener		public PSubClient(String host,int port,String clientId){		jedis = new Jedis(host,port);		listener = new PPrintListener(clientId, new Jedis(host, port));	}		public void sub(String channel){		jedis.subscribe(listener, channel);	}		public void unsubscribe(String channel){		listener.unsubscribe(channel);	}	}

 

PPubSubTestMain.java

 

public class PPubSubTestMain {	/**	 * @param args	 */	public static void main(String[] args) throws Exception{		PPubClient pubClient = new PPubClient(Constants.host, Constants.port);		final String channel = "pubsub-channel-p";		final PSubClient subClient = new PSubClient(Constants.host, Constants.port,"subClient-1");		Thread subThread = new Thread(new Runnable() {						@Override			public void run() {				System.out.println("----------subscribe operation begin-------");				//在API级别,此处为轮询操作,直到unsubscribe调用,才会返回				subClient.sub(channel);				System.out.println("----------subscribe operation end-------");							}		});		subThread.setDaemon(true);		subThread.start();		int i = 0;		while(i < 2){			String message = RandomStringUtils.random(6, true, true);//apache-commons			pubClient.pub(channel, message);			i++;			Thread.sleep(1000);		}		subClient.unsubscribe(channel);	}	}

 

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Redis学习笔记8--Redis发布/订阅
【Redis】jedis客户端实现redis消息的发布订阅(实时消息中间件)
Redis分布式事务
Java中使用Jedis操作Redis
使用Spring Data Redis操作Redis(一)
基于redis内存数据库简单使用
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服