`
sheungxin
  • 浏览: 103447 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

基于oracle的增量数据采集实现总结

阅读更多
  • 项目打包方案
在“基于oracle的增量数据采集”一文中提出了基于触发器》物化视图》存储过程》java source》外部程序数据采集方案。本文初步对其进行了实现,利用maven-assembly-plugin进行打包,输出结构如下:bin、conf、lib,分别存放命令文件、配置文件、jar包,需注意在bin目录下命令文件中把conf、lib加入classpath中,详见start.bat、clear.bat
@echo off & setlocal enabledelayedexpansion

set LIB_JARS=""
cd ..\lib
for %%i in (*) do set LIB_JARS=!LIB_JARS!;..\lib\%%i
cd ..\bin

java -Xms64m -Xmx1024m -classpath ..\conf;%LIB_JARS% com.service.data.sync.oracle.producer.SyncDataEnv init
goto end

:end
pause
目前仅配置命令文件start.bat、clear.bat,start.bat:启动初始化数据同步环境(创建java source、存储过程、物化视图、触发器),clear.bat:清除数据同步环境(删除java source、存储过程、物化视图、触发器),clear.bat只需把上述命令中init替换为clear即可

maven-jar-plugin,也是一个maven打包插件,可加入lib依赖、指定启动类等。使用java -jar 【jar包】 【args】 命令启动,本项目使用assembly即可

  • 需要注意事项
a、配置文件中指定需要同步的表,执行start.bat自动为每个表建立物化视图和触发器,已存在忽略
b、目前不支持二进制、单条数据过长,因为在触发器中拼接的数据类型为varchar2,限制4000;需要特殊处理,未验证
c、向外发布数据应采用异步发送方案,否则会影响业务库的数据提交。java source执行完,业务库中的事务方可提交成功,已验证
d、触发器创建在物化视图的原因在于,在表上建立触发器,未提交已触发。物化视图在未提交前存在物化视图日志中,不会触发
e、尽量减少jar的使用,避免oracle导入jar过多
f、检查是否安装OracleJVM,用sys用户执行"select * from dba_registry where comp_id='JAVAVM'",没有记录表示未安装,用database configuration assistant安装java组件或者执行$ORACLE_HOME/javavm/install/initjvm.sql脚本
g、在初始化数据同步环境前需在oracle中添加java source所依赖的jar包,目前最简方案把本项目中AbstractSend、HttpUrlSend、SyncDataRunner三个类打包,添加到oracle中;仅使用HttpURLConnection向外发数据,不依赖任何java环境外的jar,仅用于测试;正式环境应考虑使用其它异步发送方案,例如消息队列
loadjava -r -f -verbose -resolve -user username/password xxx.jar
loadjava -r -f -user username/password xxx.class
dropjava -r -f -user [option_list] file_list
该命令需要直接在数据库本机的命令行中执行,不能在plsql中执行

  • 代码实现
package com.service.data.sync.oracle.producer.send;

/**
 * 把监控获取数据向外发送,
 * @author sheungxin
 *
 */
public abstract class AbstractSend {
	
	/**
	 * 发送消息
	 * @param message
	 */
	public void send(String message){
		beforeSend(message);
		exeSend(message);
		afterSend(message);
	}
	
	/**
	 * 发送消息前执行
	 * @param message
	 */
	private void beforeSend(String message){
		//do something
	}
	
	/**
	 * 实际发送消息实现类
	 * @param message
	 */
	public abstract boolean exeSend(String message);

	/**
	 * 发送消息后执行
	 * @param message
	 */
	private void afterSend(String message){
		//do something
	}
}
消息发送抽象类,把监控的数据变化信息发送出去,可在发送前后做一些操作
package com.service.data.sync.oracle.producer.send;

import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;

/**
 * 使用HttpURLConnection向外发送消息,GET方式,消息有长度限制,仅用于测试
 * @author sheungxin
 *
 */
public class HttpUrlSend extends AbstractSend{
	
	private static final String sendPath="http://192.168.19.99:8181/jeesite/f/list-7.html?params=";
			
	public boolean exeSend(String message){
		boolean flag=true;
		try{
			URL url = new URL(sendPath);
			HttpURLConnection conn = (HttpURLConnection) url.openConnection();
			conn.setRequestMethod("GET");
			conn.setDoInput(true);
			InputStream is = conn.getInputStream();
			is.close();
		}catch(Exception e){
			e.printStackTrace();
			flag=false;
		}
		return flag;
	}

}
本消息发送实现类仅用于测试,不建议使用。在注意事项中提到过,需改用异步发送机制,避免对业务库事物提交产生影响
package com.service.data.sync.oracle.producer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import com.service.data.sync.oracle.producer.send.AbstractSend;
import com.service.data.sync.oracle.producer.send.HttpUrlSend;

/**
 * 数据同步执行类
 * @author sheungxin
 *
 */
public class SyncDataRunner {
	
	private static final BlockingQueue<String> taskQueue=new LinkedBlockingQueue<String>();
	
	/**
	 * 向队列中添加待发送的消息,队列满后返回false,LinkedBlockingQueue默认Integer.MAX_VALUE
	 * @param message
	 */
	public static boolean addTask(String message){
		return taskQueue.offer(message);
	}
	
	/**
	 * 消费队列中数据,向外发送消息
	 */
	public static void executeTask(){
		AbstractSend httpSend=new HttpUrlSend();
		while(true){
			String message;
			try {
				//堵塞获取待发送消息
				message = taskQueue.take();
				httpSend.send(message);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}
数据同步实际操作类,仅提供两个方法,addTask用于把监控到的数据放入队列中,在oracle中的java source代码中调用;executeTask用于把队列中的消息向外发送,此处仅单线程向外发送,可考虑多线程,但需注意对oracle性能的影响。具体根据业务情况,单线程+MQ应该也可以满足业务需求。另外,消息存放在队列中,异常情况下有可能会丢失,需要针对可能发生的异常进行特殊处理

关于java代码操作java source、存储过程、物化视图、触发器,这里就不贴代码,主要介绍遇到的一些坑。
1、在创建触发器时,拼接数据需要用到:old、:new,分别为触发事件之前和之后的对象,一直报错,无法创建成功。但是把sql拷贝到plsql可正常执行,通过排除法定位是“:old、:new”引起的。直接使用的JDBC,替换PrepareStatement改用Statement后可以了,PrepareStatement会进行预编译,一直不通过
2、创建java source时,一直异常,也是拷贝到plsql中可成功执行。最后解决方案:Statement.setEscapeProcessing(false),如果参数为true,则驱动程序在把SQL语句发给数据库前进行转义替换,否则让数据库自己处理

  • 其它思路
add2ws 写道
用得着这么麻烦么,直接用kettle插入更新,配合oracle触发器省时省力
根据上述意见,调研了kettle的增量同步方案:
1、全量比对取增量
2、使用时间戳进行数据增量更新
3、使用触发器+快照表进行数据增量更新
方案1,全量比对性能肯定不佳;方案2,需要时间戳,原有业务库不一定支持;方案3,使用快照表记录数据变化情况。网上有部分方案提出:快照表仅记录修改、删除操作,定时删除目标表在快照表中有的数据,再复制源表中没有的数据。这样删除修改数据再添加,有些业务不可取。这样的好处我的理解是可以批量处理数据,若根据操作类型逐条处理效率过低。所以我们需要根据实际业务场景去均衡实时性、性能、一致性、复杂度等去制定方案。
所以上文中的实现方案即替换为:触发器+快照表+快照表扫描程序(也可考虑用kettle),好处在于与数据库耦合度降低,但实时性、效率可能就差一些

以下这篇帖子对物化视图和ETL进行了讨论,适合自己即可,可以参考下:
引用
0
0
分享到:
评论
2 楼 sheungxin 2017-02-27  
@add2ws 我试下,多谢!
1 楼 add2ws 2017-02-27  
用得着这么麻烦么,直接用kettle插入更新,配合oracle触发器省时省力

相关推荐

Global site tag (gtag.js) - Google Analytics