189 8069 5689

mq取出消息java代码 java获取xml内容去掉bom头

java怎么将mq接收的文件消息提取出来

WebSphere MQ 接收发送

我们提供的服务有:成都做网站、成都网站制作、成都外贸网站建设、微信公众号开发、网站优化、网站认证、成安ssl等。为千余家企事业单位解决了网站和推广的问题。提供周到的售前咨询和贴心的售后服务,是有科学管理、有技术的成安网站制作公司

添加mq jar

类介绍:

SendMSG:消息发送类。

Main():主方法。

SendMSG():消息发送方法。

方法描述:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

package test;

public class SendMSG{

MQEnvironment.hostname = "192.168.10.201";

//通道类型为服务器连接通道

MQEnvironment.channel = "tongdao";

MQEnvironment.CCSID = 1381;

//消息队列端口号

MQEnvironment.port = 10618;

try{

//建立队列管理器QM_SERVER为队列管理器名称

MQQueueManager qMgr = new MQQueueManager("test");

int openOptions = MQC.MQOO_INPUT_AS_Q_DEF|MQC.MQOO_OUTPUTMQC.MQOO_INQUIRE;//建立队列INITQ队列名称INITQ为本地队列

MQQueue queue = qMgr.accessQueue("wanghui",openOptions,null,null,null);

System.out.println("成功建立通道");

MQMessage message = new MQMessage();

message.format = MQC.MQFMT_STRING;

message.characterSet = 1381;

message.writeString("王辉");

message.expiry = -1;//设置消息用不过期

queue.put(message);//将消息放入队列

queue.close();//关闭队列

qMgr.disconnect();//断开连接

}catch(EOFExceptione){

e.printStackTrace();

}catch(MQExceptione){

e.printStackTrace();

}catch(Exceptione){

e.printStackTrace();

}

}

ReceiveMSG:消息接收类。

Main():主方法。

ReceiveMSG():消息接收方法。

public class ReceiveMSG {

MQEnvironment.hostname="192.168.10.201";//通道类型为服务器连接通道

MQEnvironment.channel="tongdao";

MQEnvironment.CCSID=1381;

MQEnvironment.port=10618;

try{

//建立队列管理器QM_SERVER为队列管理器名称

MQQueueManager qMgr = new MQQueueManager("test");

int openOptions=MQC.MQOO_INPUT_AS_Q_DEF|MQC.MQOO_OUTPUT|MQC.MQOO_INQUIRE;//建立队列INITQ队列名称INITQ为本地队列

MQQueue queue=qMgr.accessQueue("wanghui",openOptions,null,null,null);

System.out.println("成功建立通道");

MQMessage message= new MQMessage();

message.format=MQC.MQFMT_STRING;

message.characterSet=1381;

//从队列中获取消息

MQGetMessage Optionspmo=new MQGetMessageOptions();

queue.get(message,pmo);

Stringchars=message.readLine();

System.out.println(chars);

queue.close();//关闭队列

qMgr.disconnect();//断开连接

}catch(EOFExceptione){

e.printStackTrace();

}catch(MQExceptione){

e.printStackTrace();

}catch(Exceptione){

e.printStackTrace();

}

}

求Java连接MQ的代码,通过代码实现收发消息,谢谢各位大神!

import javax.jms.BytesMessage;

import javax.jms.Connection;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.MessageProducer;

import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.fxun.plant.vo.CommandVO;

public class ProducerTool extends Thread {

private Destination destination;

// private int messageCount = 500;

long sleepTime = 0;

// private boolean verbose = true;

// private int messageSize = 255;

private long timeToLive = 0; // 消息存活时间

private String user = ActiveMQConnection.DEFAULT_USER;

private String password = ActiveMQConnection.DEFAULT_PASSWORD;

private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

private String subject; // subject的名字,默认是TOOL.DEFAULT

// private boolean topic;

private boolean transacted = false; // 是否采用事务

// private boolean persistent = false;

private P2PQueue p2pQueue;

public ProducerTool(String user, String password, String url, String subject) {

this.user = user;

this.password = password;

this.url = url;

this.subject = subject;

}

public void run() {

Connection connection = null;

try {

// Create the connection.

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);

connection = connectionFactory.createConnection();

connection.start();

// Create the session

Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);

destination = session.createQueue(subject);

// Create the producer.

MessageProducer producer = session.createProducer(destination);

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

producer.setTimeToLive(timeToLive);

CommandVO commandVO = null;

int size = 0;

while (true) {

size = p2pQueue.getSize();

if (size 0) {

BytesMessage message = session.createBytesMessage();

message.writeInt(size);

for (int i = 0; i size; i++) {

commandVO = p2pQueue.pool();

if(commandVO == null) {

message.writeInt(0);

} else {

message.writeInt(commandVO.getCountSize());

message.writeInt(commandVO.getCommand()); // 指令

message.writeBytes(commandVO.getContent()); // 内容

}

}

producer.send(message);

}

Thread.sleep(300);

}

// Use the ActiveMQConnection interface to dump the connection

// stats.

// ActiveMQConnection c = (ActiveMQConnection) connection;

// c.getConnectionStats().dump(new IndentPrinter());

} catch (Exception e) {

System.out.println("Caught: " + e);

e.printStackTrace();

} finally {

try {

connection.close();

} catch (Throwable ignore) {

}

}

}

public void setPassword(String pwd) {

this.password = pwd;

}

public void setSleepTime(long sleepTime) {

this.sleepTime = sleepTime;

}

public void setSubject(String subject) {

this.subject = subject;

}

public void setTimeToLive(long timeToLive) {

this.timeToLive = timeToLive;

}

public void setTransacted(boolean transacted) {

this.transacted = transacted;

}

public void setUrl(String url) {

this.url = url;

}

public void setUser(String user) {

this.user = user;

}

public P2PQueue getP2pQueue() {

return p2pQueue;

}

public void setP2pQueue(P2PQueue p2pQueue) {

this.p2pQueue = p2pQueue;

}

}

原代码都发给你

java如何获取rabbitmq队列中消息数量

下面是RabbitMQ的消息确认机制:“为了确保消息不会丢失,RabbitMQ支持消息确认机制。客户端在接受到消息并处理完后,可以发送一个ack消息给RabbitMQ,告诉它该消息可以安全的删除了。假如客户端在发送ack之前意外死掉了,那么RabbitMQ会将消息投递到下一个consumer客户端。如果有多个consumer客户端,RabbitMQ在投递消息时是轮询的。RabbitMQ如何判断客户端死掉了?唯一根据是客户端连接是否断开。这里没有超时机制,也就是说客户端可以处理一个消息很长时间,只要没断开连接,RabbitMQ就一直等待ack消息。”我现在遇到的问题是这样的:我这边有几条线程去消息队列里取数据,但是会有异常数据导致线程挂掉,就是上边的“客户端在发送ack之前意外死掉了”,RabbitMQ会将消息投递到下一个consumer客户端,这样一条异常数据会把我的所有线程挂掉,我现在想实现这样的功能:如果有异常数据导致进程挂掉,那么我不让RabbitMQ将这条消息投递到下一个consumer客户端,而是放到另一个地方或者另外处理,请问该如何实现呢?

java使用mq get api从mq中取数据怎样触发侦听器连续取数据

{

//前面是准备管理器和队列

MQQueueManager qMgr = new MQQueueManager(qManager);

int openOptions = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INQUIRE;

MQQueue queue = qMgr.accessQueue(qName, openOptions);

MQMessage rcvMessage = new MQMessage();

MQGetMessageOptions gmo = new MQGetMessageOptions();

gmo.options = gmo.options + MQConstants.MQGMO_WAIT + MQConstants.MQGMO_SYNCPOINT;

//读取五秒超时,这里目的是要有个读取阻塞,和Socket编程类似。

gmo.waitInterval = 5000;

queue.get(rcvMessage, gmo);

//后面就是操作消息的部分【略】

}catch(Exception e){{

//前面是准备管理器和队列

MQQueueManager qMgr = new MQQueueManager(qManager);

int openOptions = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INQUIRE;

MQQueue queue = qMgr.accessQueue(qName, openOptions);

MQMessage rcvMessage = new MQMessage();

MQGetMessageOptions gmo = new MQGetMessageOptions();

gmo.options = gmo.options + MQConstants.MQGMO_WAIT + MQConstants.MQGMO_SYNCPOINT;

//读取五秒超时,这里目的是要有个读取阻塞,和Socket编程类似。

gmo.waitInterval = 5000;

queue.get(rcvMessage, gmo);

//后面就是操作消息的部分【略】

}catch(Exception e){


网站名称:mq取出消息java代码 java获取xml内容去掉bom头
分享URL:http://cdxtjz.cn/article/ddephdj.html

其他资讯