代码拉取完成,页面将自动刷新
同步操作将从 wangqq/oracleAQ-Jms 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
GRANT EXECUTE ON SYS.DBMS_AQ to 'username';
GRANT EXECUTE ON SYS.DBMS_AQADM to 'username';
GRANT EXECUTE ON SYS.DBMS_AQ_BQVIEW to 'username';
GRANT EXECUTE ON SYS.DBMS_AQIN to 'username';
GRANT EXECUTE ON SYS.DBMS_JOB to 'username';
CREATE OR REPLACE TYPE TYPE_QUEUE_INFO AS OBJECT
(
param_1 VARCHAR2(100),
param_2 VARCHAR2(100)
)
begin
sys.dbms_aqadm.create_queue_table(
queue_table => 'QUEUE_TABLE',
queue_payload_type => 'TYPE_QUEUE_INFO',
sort_list => 'ENQ_TIME',
compatible => '10.0.0',
primary_instance => 0,
secondary_instance => 0);
end;
begin
sys.dbms_aqadm.create_queue(
queue_name => 'QUEUE_TEST',
queue_table => 'QUEUE_TABLE',
queue_type => sys.dbms_aqadm.normal_queue,
max_retries => 5,
retry_delay => 0,
retention_time => 0);
end;
begin
-- 启动队列
sys.dbms_aqadm.start_queue(
queue_name => 'QUEUE_TEST'
);
-- 暂停队列
--sys.dbms_aqadm.STOP_QUEUE(
-- queue_name => 'QUEUE_TEST'
--);
-- 删除队列
--sys.dbms_aqadm.DROP_QUEUE(
-- queue_name => 'QUEUE_TEST'
--);
-- 删除对列表
--sys.dbms_aqadm.DROP_QUEUE_TABLE(
-- queue_table => 'QUEUE_TABLE'
--);
end;
CREATE OR REPLACE PROCEDURE pro_queue(param_1 VARCHAR2, param_2 VARCHAR2) as
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload TYPE_QUEUE_INFO;
begin
-- 封装最终消息
o_payload := TYPE_QUEUE_INFO(param_1, param_2);
-- 入队操作,指定队列
dbms_aq.enqueue(queue_name => 'QUEUE_TEST',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle);
-- 出队操作
--dbms_aq.enqueue(queue_name => 'QUEUE_TEST',
-- dequeue_options => r_dequeue_options,
-- message_properties => r_message_properties,
-- payload => o_payload,
-- msgid => v_message_handle);
end pro_queue;
<dependency>
<groupId>com.oracle</groupId>
<artifactId>jmscommon</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>orai18n</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>jta</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>aqapi_g</artifactId>
<version>1.2</version>
</dependency>
spring:
datasource:
url: jdbc:oracle:thin:@ip:port/sid
username: **
password: **
queue:
aq:
# 该队列是否可用,用来控制队列的加载和重连,不可省略
enable: true
# 队列名称,不可省略
name: QUEUE_TEST
# 队列重连的定时任务对应的时间表达式,不可省略
cron: 0 */1 * * * ?
package com.wangqq.jms;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
/**
* @Title: MessageAQInit.java
* @Description: AQ 初始化
* @author wangqq
* @date 2020年6月28日 下午3:45:23
* @version 1.0
*/
@Component
public class MessageAQInit implements CommandLineRunner {
@Autowired
private MessageAQConfig aqConfig;
@Autowired
private MessageAQListener listener;
@Override
public void run(String... args) throws RuntimeException {
// 检查消息队列是否启用
if (aqConfig.enable) {
// 设置AQ的消息监听器
MessageAQConnection.setListener(listener);
// 初始化AQ连接
if (!MessageAQConnection.initFactory(aqConfig)) {
throw new RuntimeException("Message Oracle AQ initialization failed!");
}
// 建立连接
if (!MessageAQConnection.establishConnection(aqConfig)) {
throw new RuntimeException("Message Oracle AQ connection failed!");
}
}
}
}
package com.wangqq.jms;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @Title: MessageAQConfig.java
* @Description: ORACLE 消息队列配置
* @author wangqq
* @date 2020年6月28日 下午3:36:08
* @version 1.0
*/
@Component
public class MessageAQConfig {
/** 是否开启MessageAq功能 */
@Value("${queue.aq.enable}")
public Boolean enable;
/** 数据库用户名 */
@Value("${spring.datasource.username}")
public String userName;
/** 数据库密码 */
@Value("${spring.datasource.password}")
public String password;
/** 数据库地址url */
@Value("${spring.datasource.url}")
public String url;
/** 队列名称 */
@Value("${queue.aq.name}")
public String queue;
}
package com.wangqq.jms;
import javax.jms.Queue;
import javax.jms.Session;
import lombok.extern.slf4j.Slf4j;
import oracle.jms.AQjmsConnection;
import oracle.jms.AQjmsConnectionFactory;
import oracle.jms.AQjmsConsumer;
import oracle.jms.AQjmsSession;
/**
* @Title: MessageAQConnection.java
* @Description: AQ 连接
* @author wangqq
* @date 2020年6月28日 下午3:50:32
* @version 1.0
*/
@Slf4j
public class MessageAQConnection {
private static AQjmsConnectionFactory aQjmsConnectionFactory;
private static AQjmsConsumer aQjmsConsumer;
private static AQjmsSession aQjmsSession;
private static AQjmsConnection aQjmsConnection;
private static MessageAQListener listener;
/**
* 设置JMS监听器
*
* @param messageAqJmsListener
* @author wangqq
* @date 2020年7月6日 上午8:33:57
*/
public static void setListener(MessageAQListener messageAqJmsListener) {
listener = messageAqJmsListener;
}
/**
* 初始化 AQ 连接 Factory
*
* @param aqConfig 消息队列配置
* @return 是否成功
*/
public static boolean initFactory(MessageAQConfig aqConfig) {
try {
aQjmsConnectionFactory = new AQjmsConnectionFactory();
aQjmsConnectionFactory.setJdbcURL(aqConfig.url);
aQjmsConnectionFactory.setUsername(aqConfig.userName);
aQjmsConnectionFactory.setPassword(aqConfig.password);
return true;
} catch (Exception e) {
log.error(e.getMessage(), e);
return false;
}
}
/**
* 连接消息队列
*
* @param aqConfig 消息队列配置
* @return 是否成功
*/
public static boolean establishConnection(MessageAQConfig aqConfig) {
try {
aQjmsConnection = (AQjmsConnection) aQjmsConnectionFactory.createConnection();
aQjmsSession = (AQjmsSession) aQjmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
aQjmsConnection.start();
Queue queue = aQjmsSession.getQueue(aqConfig.userName, aqConfig.queue);
aQjmsConsumer = (AQjmsConsumer) aQjmsSession.createConsumer(queue, null, MessageORAData.getFactory(), null,
false);
aQjmsConsumer.setMessageListener(listener);
return true;
} catch (Exception e) {
log.error(e.getMessage(), e);
return false;
}
}
/**
* 关闭消息队列连接
*
* @return 是否成功
*/
public static boolean closeConnection() {
try {
aQjmsConsumer.close();
aQjmsSession.close();
aQjmsConnection.close();
return true;
} catch (Exception e) {
log.error(e.getMessage(), e);
return false;
}
}
}
package com.wangqq.bean;
import lombok.Builder;
import lombok.Data;
/**
* @Title: Test.java
* @Description: AQ 数据承载类
* @author wangqq
* @date 2021-01-20 16:19:16
* @version 1.0
*/
@Data
@Builder
public class Test {
private String param_1;
private String param_2;
}
package com.wangqq.jms;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Struct;
import com.wangqq.bean.Test;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import oracle.sql.Datum;
import oracle.sql.ORAData;
import oracle.sql.ORADataFactory;
/**
* @Title: MessageORAData.java
* @Description: 数据类型转换类
* @author synjones
* @date 2018年12月3日 上午11:29:50
* @version 1.0
*/
@Slf4j
@NoArgsConstructor
public class MessageORAData implements ORAData, ORADataFactory {
private Object[] rawData = new Object[8];
private static final MessageORAData MESSAGE_FACTORY = new MessageORAData();
public static ORADataFactory getFactory() {
return MESSAGE_FACTORY;
}
@Override
public ORAData create(Datum datum, int sqlType) throws SQLException {
if (datum == null) {
return null;
} else {
try {
MessageORAData payOraData = new MessageORAData();
Struct aStruct = (Struct) datum;
payOraData.rawData = aStruct.getAttributes();
return payOraData;
} catch (Exception e) {
log.error(e.getMessage(), e);
return null;
}
}
}
@Override
public Datum toDatum(Connection arg0) throws SQLException {
return null;
}
/**
* 消息内容解析并封装
*
* @return
* @author wangqq
* @date 2020年7月6日 上午8:38:01
*/
public Test getContent() {
try {
return Test.builder()
.param_1(rawData[0] == null ? null : rawData[0].toString())
.param_2(rawData[0] == null ? null : rawData[0].toString())
.build();
} catch (Exception e) {
log.error(e.getMessage(), e);
return null;
}
}
}
package com.wangqq.jms;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.springframework.stereotype.Component;
import com.wangqq.bean.Test;
import lombok.extern.slf4j.Slf4j;
import oracle.jms.AQjmsAdtMessage;
/**
* @Title: JMSListener.java
* @Description: JMS监听ORACLEAQ的队列消息
* @author wangqq
* @date 2020年6月28日 上午11:23:42
* @version 1.0
*/
@Slf4j
@Component
public class MessageAQListener implements MessageListener {
@Override
public void onMessage(Message message1) {
AQjmsAdtMessage adtMessage = (AQjmsAdtMessage)message1;
try {
MessageORAData payload = (MessageORAData)adtMessage.getAdtPayload();
// 获取消息内容
Test test = payload.getContent();
System.out.println(test.toString());
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
package com.wangqq.jms;
import java.util.Date;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.wangqq.mapper.MessageAqMapper;
import com.wangqq.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
/**
* @Title: MessageAQMonitor.java
* @Description: AQ 监控任务, 在AQ断开后重连
* @author wangqq
* @date 2020年6月28日 下午4:35:31
* @version 1.0
*/
@Slf4j
@Component
public class MessageAQMonitor {
@Autowired
private MessageAQConfig aqConfig;
@Autowired
private MessageAqMapper aqMapper;
@Scheduled(cron = "${queue.aq.cron}")
private void monitorJob() {
// 检查消息队列是否启用
if (!aqConfig.enable) {
return;
}
// 获取当前时间,并向前推5分钟
String formatDateTime = DateUtil.formatDate(new Date(System.currentTimeMillis() - 300000));
// 将该时间转为0时区的时间【数据库中存储的队列时间为0时区的时间】
String zeroZoneTime = DateUtil.timeConvert(formatDateTime, "+08:00", "+00:00", "yyyy-MM-dd HH:mm:ss");
// 查询是否存在5分钟以前的队列未被消费
int selectCount = aqMapper.selectCount(aqConfig.queue, zeroZoneTime);
if (selectCount != 0) {
// 若存在,则重新启动监听
if (MessageAQConnection.closeConnection()) {
log.info("--> AQ connection has been closed.");
if (MessageAQConnection.establishConnection(aqConfig)) {
log.info("--> AQ connection has been re-established.");
}
}
}
}
}
package com.wangqq.mapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
/**
* @Title: MessageAqMapper.java
* @Description: oracleAQ的查询
* @author wangqq
* @date 2020年6月28日 下午4:04:50
* @version 1.0
*/
@Mapper
public interface MessageAqMapper {
/**
*
* 查询数据库中的队列表中符合条件的队列的条数
*
* @param qName 队列名称
* @param minDatetime 队列入队的最小时间
* @return
* @author wangqq
* @date 2020-07-10 15:44:43
*/
@Select("select count(msgid) from T_QUEUE_TABLE t where t.q_name = #{qName,jdbcType=VARCHAR} "
+ "and to_char(cast(t.enq_time AS DATE), 'yyyy-MM-dd HH24:mi:ss') < #{minDatetime,jdbcType=VARCHAR}")
int selectCount(String qName, String minDatetime);
}
package com.wangqq.util;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
/**
* @Title: DateUtil.java
* @Description: 日期工具类
* @author wangqq
* @date 2018年10月29日 下午5:27:21
* @version 1.0
*/
public class DateUtil {
/**
* 字符串转date,默认格式yyyy-MM-dd HH:mm:ss
*
* @param source
* @return
*/
public static Date parseDate(String source) {
return parseDate(source, "yyyy-MM-dd HH:mm:ss");
}
/**
* 字符串转date
*
* @param source
* @param pattern
* 格式
* @return
*/
public static Date parseDate(String source, String pattern) {
if (source == null || source.equals("")) {
return null;
}
SimpleDateFormat sdf = new SimpleDateFormat(pattern);
try {
return sdf.parse(source);
} catch (ParseException e) {
e.printStackTrace();
return null;
}
}
/**
* 格式化日期,默认格式yyyy-MM-dd HH:mm:ss
*
* @param date
* @return
*/
public static String formatDate(Date date) {
return formatDate(date, "yyyy-MM-dd HH:mm:ss");
}
/**
* 格式化日期
*
* @param date
* @param pattern
* 格式
* @return
*/
public static String formatDate(Date date, String pattern) {
if (date == null) {
return null;
}
SimpleDateFormat sdf = new SimpleDateFormat(pattern);
return sdf.format(date);
}
/**
* 时区 时间转换方法:将传入的时间(可能为其他时区)转化成目标时区对应的时间
* @param sourceTime 时间格式必须为:yyyy-MM-dd HH:mm:ss
* @param sourceId 入参的时间的时区id 比如:+08:00
* @param targetId 要转换成目标时区id 比如:+09:00
* @param reFormat 返回格式 默认:yyyy-MM-dd HH:mm:ss
* @return string 转化时区后的时间
*/
public static String timeConvert(String sourceTime, String sourceId,
String targetId,String reFormat){
//校验入参是否合法
if (null == sourceId || "".equals(sourceId) || null == targetId
|| "".equals(targetId) || null == sourceTime
|| "".equals(sourceTime)){
return null;
}
if(reFormat == null || "".equals(reFormat)){
reFormat = "yyyy-MM-dd HH:mm:ss";
}
//校验 时间格式必须为:yyyy-MM-dd HH:mm:ss
String reg = "^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$";
if (!sourceTime.matches(reg)){
return null;
}
try{
//时间格式
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//根据入参原时区id,获取对应的timezone对象
TimeZone sourceTimeZone = TimeZone.getTimeZone("GMT"+sourceId);
//设置SimpleDateFormat时区为原时区(否则是本地默认时区),目的:用来将字符串sourceTime转化成原时区对应的date对象
df.setTimeZone(sourceTimeZone);
//将字符串sourceTime转化成原时区对应的date对象
java.util.Date sourceDate = df.parse(sourceTime);
//开始转化时区:根据目标时区id设置目标TimeZone
TimeZone targetTimeZone = TimeZone.getTimeZone("GMT"+targetId);
//设置SimpleDateFormat时区为目标时区(否则是本地默认时区),目的:用来将字符串sourceTime转化成目标时区对应的date对象
df.setTimeZone(targetTimeZone);
//得到目标时间字符串
String targetTime = df.format(sourceDate);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
java.util.Date date = sdf.parse(targetTime);
sdf = new SimpleDateFormat(reFormat);
return sdf.format(date);
}
catch (ParseException e){
e.printStackTrace();
}
return null;
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。