activemq和kafka區別 activemq消息持久化方式
隊列模式(點對點模式,P2P)特點:1、客戶端包括生產者和消費者;
2、隊列中的消息只能被一個消費者消費;
3、消費者可以隨時消費隊列中的消息;

文章插圖
隊列模式和主題模式的區別:
1、提前訂閱,隊列模式:消費者不需要提前訂閱也可以消費消息;主題模式:只有提前進行訂閱的消費者才能成功消費消息;
2、多個消費者分配消息:隊列模式:只能平均消費消息,被別的消費者消費的消息不能重復被其他的消費者消費;主題模式:每個訂閱者都可以消費主題模式中的每一條消息;
案例代碼:【activemq和kafka區別 activemq消息持久化方式】生產者:
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQProducer {public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";public static final String QUEUE_NAME = "queue01";public static void main(String[] args) throws JMSException {//創建連接工廠 ,,按照定的url地址給定默認的用戶名和密碼ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);//通過連接工廠獲取connection連接 并啟動訪問Connection connection = activeMQConnectionFactory.createConnection();connection.start();//創建會話session需要兩個參數,第一個事務,第二個簽收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//創建目的地(選擇是隊列還是主題)Queue queue = session.createQueue(QUEUE_NAME);//創建消息的生產者MessageProducer messageProducer = session.createProducer(queue);//通過使用消息生產者messageProducer生產3條消息發送到隊列中for (int i = 1; i <= 7; i++) {//創建消息一個字符串消息TextMessage textMessage = session.createTextMessage("msg---->" + i);//通過messageProducer 發布消息messageProducer.send(textMessage);}//關閉資源messageProducer.close();session.close();connection.close();System.out.println("消息發送到MQ成功");}}消費者1:import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQConsumer {public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";public static final String QUEUE_NAME="queue01";public static void main(String[] args) throws JMSException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);//通過連接工廠獲取connection連接 并啟動訪問Connection connection = activeMQConnectionFactory.createConnection();connection.start();//創建會話session需要兩個參數,第一個事務,第二個簽收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//創建目的地(選擇是隊列還是主題)Queue queue = session.createQueue(QUEUE_NAME);//創建消息的消費者MessageConsumer messageConsumer = session.createConsumer(queue);while (true){//從隊列中獲取消息receive未設置最大時間 是阻塞的,TextMessage textMessage = (TextMessage) messageConsumer.receive();if (textMessage !=null){System.out.println("消費者接受到消息---->"+textMessage.getText());}else {break;}}messageConsumer.close();session.close();connection.close();}}輸出: INFO | Successfully connected to tcp://192.168.1.17:61616消費者接受到消息---->msg---->2消費者接受到消息---->msg---->4消費者接受到消息---->msg---->6消費者2:import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;import java.io.IOException;public class ActiveMQConsumerListener {public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";public static final String QUEUE_NAME = "queue01";public static void main(String[] args) throws JMSException, IOException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);//通過連接工廠獲取connection連接 并啟動訪問Connection connection = activeMQConnectionFactory.createConnection();connection.start();//創建會話session需要兩個參數,第一個事務,第二個簽收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//創建目的地(選擇是隊列還是主題)Queue queue = session.createQueue(QUEUE_NAME);//創建消息的消費者MessageConsumer messageConsumer = session.createConsumer(queue);//通過監聽的機制消費消息messageConsumer.setMessageListener((message) -> {if (message != null && message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;try {System.out.println("消費者接受到消息---->" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}});//不關閉控制臺如果不加這句話,在下面可能在連接的時候直接關閉了,造成無法消費的問題System.in.read();messageConsumer.close();session.close();connection.close();}}輸出: INFO | Successfully connected to tcp://192.168.1.17:61616消費者接受到消息---->msg---->1消費者接受到消息---->msg---->3消費者接受到消息---->msg---->5消費者接受到消息---->msg---->7
文章插圖
Number Of Consumers:表示消費者數量;
Number Of Pending Messages:等待消費的消息,這個是當前未出隊列的數量;
Messages Enqueued:進入隊列的消息;( 這個數量只增不減,重啟后會清零);
Messages Dequeued:出了隊列的消息 可以理解為是消費者消費掉的數量 (重啟后會清零);
持久化案例代碼:ActiveMQ持久化,生產者產生的數據,在沒有被消費者消費時,先保存到數據庫中,當數據被消費者消費后,再從數據庫中刪除 。
生產者:
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQProducer {public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";public static final String QUEUE_NAME = "queue02";public static void main(String[] args) throws JMSException {//創建連接工廠 ,,按照定的url地址給定默認的用戶名和密碼ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);//通過連接工廠獲取connection連接 并啟動訪問Connection connection = activeMQConnectionFactory.createConnection();connection.start();//創建會話session需要兩個參數,第一個事務,第二個簽收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//創建目的地(選擇是隊列還是主題)Queue queue = session.createQueue(QUEUE_NAME);//創建消息的生產者MessageProducer messageProducer = session.createProducer(queue);// 消息持久化messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);//通過使用消息生產者messageProducer生產3條消息發送到隊列中for (int i = 1; i <= 7; i++) {//創建消息一個字符串消息TextMessage textMessage = session.createTextMessage("msg---->" + i);//通過messageProducer 發布消息messageProducer.send(textMessage);}//關閉資源messageProducer.close();session.close();connection.close();System.out.println("消息發送到MQ成功");}}
推薦閱讀
- 蝦和胡蘿卜能一起吃嗎 蝦和胡蘿卜是否能一起吃的解析
- 北疆和南疆哪個大
- 煮粥和煮稀飯有什么區別
- 糯米膠和改性淀粉膠哪個好
- 孜然怎么炒
- 毛木耳和木耳有區別毛木耳
- 5年保修和5年質保的區別
- 會計和金融考研哪個更好
- 龍族優美句子
- 微信心碎心情簽名
