Giter VIP home page Giter VIP logo

o2k's Introduction

o2k 工具

o2ks社区版是一款高性能、“免费”的oracle数据订阅同步软件,支持从Oracle数据库的1521端口直接读取在线日志文件和归档日志文件,解析出DDL和DML操作,输出binlog event到kafka供下游应用程序进行数据订阅。

本软件仓库提供了Java、C++版本的读取o2k输出的示例代码JBinlogDumpK/JBinlogDumpS/binlogdumpK;提供了动态查询和修改o2k状态及配置的工具qdcadm;提供了o2k跟Flink结合的示例程序FlinkSample/frauddetection.

1. java binlogdump范例:读取并打印binlog

o2k从oracle解析归档日志文件和在线日志文件,生成binlog,并写入kafka,或文件,或socket。 JBinlogDump工具分别从kafka和binlog server读取binlog:

  • JBinlogDumpK: 从kafka读取binlog,反序列化成Java object,打印到控制台。
  • JBinlogDumpS: 从socket(binlog server)读取binlog,反序列化成Java object,打印到控制台。

1.1 编译

要求jdk版本: 不低于java 8

cd JBinlogDump

javac -cp .:lib/commons-cli-1.5.0.jar:lib/protobuf-3.6.1.jar:lib/kafka-clients-3.0.0.jar:lib/log4j-1.2.17.jar:lib/slf4j-api-1.7.30.jar:lib/slf4j-log4j12-1.7.30.jar:lib/binlog-serializer.jar src/*.java

1.2 运行JBinlogDumpK: 从kafka读取binlog并打印出来

cd JBinlogDump

java  -cp .:lib/commons-cli-1.5.0.jar:lib/protobuf-3.6.1.jar:lib/kafka-clients-3.0.0.jar:lib/log4j-1.2.17.jar:lib/slf4j-api-1.7.30.jar:lib/slf4j-log4j12-1.7.30.jar:lib/binlog-serializer.jar:src JBinlogDumpK -b 127.0.0.1:9092 -s schema1,schema2

参数说明:

-h 显示帮助信息

-b kafka.bootstrap.server, 默认是127.0.0.1:9092

-s 指定oracle schema,多个schema用逗号分隔。o2k会将每一个schema下的所有table的binlog写入一个kafka topic,所以,必须指定要读取哪些schema的binlog。

1.3 运行JBinlogDumpS: 从socket读取binlog并打印出来

cd JBinlogDump

java  -cp .:lib/commons-cli-1.5.0.jar:lib/protobuf-3.6.1.jar:lib/kafka-clients-3.0.0.jar:lib/log4j-1.2.17.jar:lib/slf4j-api-1.7.30.jar:lib/slf4j-log4j12-1.7.30.jar:lib/binlog-serializer.jar:src JBinlogDumpS -a 127.0.0.1:9191

参数说明:

-h 显示帮助信息

-a binlog server的地址和端口号,默认是127.0.0.1:9191。当o2k输出到socket时,默认会绑定9191端口。

2. C++ binlogdump范例:读取并打印binlog

2.1 编译

2.1.1 环境要求

  • gcc版本: 不低于4.8
  • protobuf lib: 3.6
  • protobuf dev: 3.6
  • protobuf compile: 3.6
  • librdkafka:

安装protobuf:

centos:

yum install protobuf protobuf-compiler protobuf-devel

ubuntu:

sudo apt install libprotobuf protobuf-compiler libprotobuf-dev

安装rdkafka:

[https://github.com/edenhill/librdkafka]

2.1.2 生成protobuf序列化代码

cp proto/* binlogdump
cd binlogdump

protoc --cpp_out=. *.proto

2.1.3 编译binlogdumpK

cd binlogdump

g++ -o binlogdumpK *.cpp *.cc -I. -I/usr/local/include/librdkafka -lprotobuf -lrdkafka++

2.2 运行binlogdumpK

./binlogdumpK -b 127.0.0.1:9092 -n defaultapp -s qbench

3. qdcadm: o2k交互管理工具

3.1 编译

3.1.1 环境要求

同binlogdump

3.1.2 编译qdcadm

cd qdcadm

g++ -o qdcadm *.cpp *.cc -I. -lprotobuf -lpthread

3.2 运行qdcadm

cd qdcadm
./qdcadm 127.0.0.1:9193 woqutech list
./qdcadm 127.0.0.1:9193 woqutech get_conf binlog.output.dest

4.联系方式

微信群:扫描下方二维码,添加企业微信,发送“o2k社区交流”后,拉入群聊

o2k拉群二维码

o2k's People

Contributors

woqutech-qdecoder avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

o2k's Issues

使用QDeoder + Flink实时检测信用卡欺诈交易

使用QDeoder + Flink实时检测信用卡欺诈交易

0. 背景

在当今数字时代,信用卡欺诈行为越来越被重视。 罪犯可以通过诈骗或者入侵安全级别较低系统来盗窃信用卡卡号。 用盗得的信用卡进行很小额度的例如一美元或者更小额度的消费进行测试。 如果测试消费成功,那么他们就会用这个信用卡进行大笔消费,来购买一些他们希望得到的,或者可以倒卖的财物。

本文将介绍如何建立一个针对可疑信用卡交易行为的反欺诈检测系统。通过本文你将了解到:

  • QDecoder 如何快速获取oracle数据库的交易数据。
  • Flink 如何为我们实现复杂业务逻辑并实时执行。

QDecoder是沃趣科技研发的、基于Oracle redo日志进行二进制反解析工具,能够及时将oracle数据库的变更解析出来,形成binlog;解析的结果以canal的protobuf的形式直接写入到kafka或者socket。关于QDecoder的更多介绍,请参考:https://hub.docker.com/r/woqutech/qdecoder

Apache Flink 提供了 DataStream API 来实现稳定可靠的、有状态的流处理应用程序。 Flink 支持对状态和时间的细粒度控制,以此来实现复杂的事件驱动数据处理系统。

下面将介绍基于QDecoder + Flink实现信用卡欺诈交易实时检测报警程序。

1. 架构

架构图

QDecoder实时获取Oracle的redo log,将感兴趣的数据解析出来,写入kafka。
Flink程序(FraudDetection)使用flink-connector-kafka实时从kafka获取交易数据,使用com.alibaba.otter.canal.protocol反序列化binlog,然后进行流式计算,识别出可能的欺诈交易,并输出警告。

使用QDecoder解析Oracle的redo log,具有以下优点:

  • QDecoder只须读取在线日志文件和归档日志文件,不会更改任何oracle的数据,对用户没有任何影响,非常安全可靠。
  • QDecoder对用户事务没有侵入性,无须更改应用逻辑,无须针对QDecoder做任何特殊的schema设计。
  • QDecoder能实时地解析redo log,保证下游程序能及时获得最新的交易数据。
  • QDecoder支持oracle的各种高级功能,保证解析出的数据完整一致。
  • QDecoder简单易用,几分钟就可以配置好。

2. 配置并启动QDecoder

2.1 在oracle数据库中准备交易表格:account

create user qdecoder identified by qdecoder default tablespace USERS quota unlimited on USERS;
grant connect,resource to qdecoder;
conn qdecoder/qdecoder;
create table account(accountid int primary key, balance number);

2.2 配置并启动QDecoder

运行docker命令,即可启动QDecoder:

docker run -it --name=qdecoder -p 9191:9191 -p 9092:9092 --pull always registry.cn-hangzhou.aliyuncs.com/woqutech/qdecoder

注意:如果docker版本比较低,不支持--pull always选项,则请手动更新docker image,保证运行qdecoder的最新版本:

docker pull registry.cn-hangzhou.aliyuncs.com/woqutech/qdecoder
docker run -it --name=qdecoder -p 9191:9191 -p 9092:9092 registry.cn-hangzhou.aliyuncs.com/woqutech/qdecoder

根据提示配置QDecoder,更多信息可参考 https://hub.docker.com/r/woqutech/qdecoder

以下配置需要特别注意一下:

  • 配置项1.1中列出的sql,请以dba权限在oracle中执行,这将配置QDecoder查询系统表需要的权限。
  • 配置项2.1: 输入将要检测的表:qdecoder.account
  • 配置项3.1: 选择输出到kafka, bootstrap.servers可以不输入,直接在容器中启动kafka。account表的变更将写入topic: defaultapp.qdecoder.binlog.qdecoder

配置如下:
配置示例

等QDecoder启动后,可以按照提示运行binlogdumpK,从kafka读取binlog并打印出来。

现在更新account表,看binlogdumpK的输出:

insert into account values(1,10000);
insert into account values(2,20000);
insert into account values(3,30000);
commit;

如果网络比较快,应该马上就能看到binlogdumpK输出了相应的binlog。

注意: binlogdumpK只是为了观察一下QDecoder的输出,你可以随时关掉它,这并不影响QDecoder和Flink程序的运行。

现在,QDecoder已经正常工作了,接下来,我们写一个Flink程序,读取binlog并检测欺诈交易。

3. 编写Flink程序进行欺诈检测

下面的Flink程序将检测account表的每一笔交易,若发现一个帐户在1分钟内,先出现了一笔小交易(小于1),后面又出现了一笔大交易(大于500),则认为出现了欺诈交易,立即输出警告。

完整代码在github: https://github.com/woqutech/qdecoder/tree/main/FlinkSample/frauddetection

3.1 下载代码

git clone https://github.com/woqutech/qdecoder.git
cd qdecoder/FlinkSample/frauddetection

3.2 运行程序

frauddetection是一个maven创建的项目,有pom.xml项目文件,可以导入各种IDE,进行调试和运行。

3.2.1 用intellij IDEA运行frauddetection程序

打开项目

开始界面:open or import -> 选择frauddetection目录
或者
菜单: file/open -> 选择frauddetection目录

运行程序

菜单: run -> run 'FraudDetectionJob'

注意:如果报告slf4j重复,且有大量的log输出,请在module/dependencies中删除ch.qos.logback:logback-classic和ch.qos.logback:logback-core。

菜单: file -> Project Structure -> Modules -> dependencies, 删除上述依赖。

3.3 更新account.balance,模拟交易,观察Flink程序的输出

update account set balance = balance - 0.1 where accountid = 1;
commit;
update account set balance = balance - 0.2 where accountid = 1;
commit;
update account set balance = balance + 100 where accountid = 2;
commit;
update account set balance = balance - 501 where accountid = 1;
commit;
update account set balance = balance - 200 where accountid = 2;
commit;

在一分钟内,account-1先出现了小于1的变更,后面又出现了大于500的变更,则识别为欺诈事务。
执行完上述SQL,Flink程序会立即输出:

21:11:20,107 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=1}

表示accountid=1的帐号检测到欺诈交易。

3.4 代码解析

Flink程序(FraudDetection)共有三个类,位于src/main/java/spendreport:

  • FraudDetectionJob: Flink程序主控类,main方法中组装并运行了Flink程序。
  • BinlogTransactionSchema: 反序列化QDecoder输出的binlog,生成Transaction对象。
  • FraudDetector: 进行流式计算,检测欺诈。

程序的基本框架来自 https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/try-flink/datastream/,所不同的是,这里使用flink-connector-kafka从kafka读取QDecoder写入的binlog,然后用com.alibaba.otter.canal.protocol反序列化并组装成Transaction对象,最后交给FraudDetector进行处理,识别欺诈交易。

由于QDecoder输出的binlog采用了和阿里巴巴的canal兼容的格式,所以可以直接使用canal.protocol包反序列化之。

使用外部组件的方法也很简单,在pom.xml中加入依赖即可:

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.11</artifactId>
			<version>1.14.3</version>
		</dependency>

		<dependency>
			<groupId>com.alibaba.otter</groupId>
			<artifactId>canal.protocol</artifactId>
			<version>1.1.3</version>
		</dependency>

3.4.1 FraudDetectionJob.main:

main方法中组装Flink程序:

  • 创建Flink流执行环境:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • 创建FlinkKafkaConsumer,为Flink程序提供数据流:
FlinkKafkaConsumer<Transaction> kafkaSource = new FlinkKafkaConsumer<Transaction>("defaultapp.qdecoder.binlog.qdecoder", new BinlogTransactionSchema(), properties);
  • 将kafka数据源加入执行环境:
DataStream<Transaction> transactions = env
				.addSource(kafkaSource)
				.name("transactions");
  • 为数据流增加处理功能:用FraudDetector分析每一笔交易,按accountid分区,并行计算
DataStream<Alert> alerts = transactions
			.keyBy(Transaction::getAccountId)
			.process(new FraudDetector())
			.name("fraud-detector");
  • 最后,启动Flink程序
env.execute("Fraud Detection");

完整代码如下:

public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 创建FlinkKafkaConsumer,用BinlogTransactionSchema反序列化
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
		properties.setProperty("group.id", "flink.test");
		
		FlinkKafkaConsumer<Transaction> kafkaSource = new FlinkKafkaConsumer<Transaction>("defaultapp.qdecoder.binlog.qdecoder", new BinlogTransactionSchema(), properties);
		kafkaSource.setStartFromEarliest();

		DataStream<Transaction> transactions = env
				.addSource(kafkaSource)
				.name("transactions");

		DataStream<Alert> alerts = transactions
			.keyBy(Transaction::getAccountId)
			.process(new FraudDetector())
			.name("fraud-detector");

		alerts
			.addSink(new AlertSink())
			.name("send-alerts");

		env.execute("Fraud Detection");
	}

3.4.2 BinlogTransactionSchema.deserialize:

使用com.alibaba.otter.canal.protocol反序列化binlog, 计算balance的变化,生成org.apache.flink.walkthrough.common.entity.Transaction对象。

主要代码如下:

// 反序列化Entry
CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(binlog);
// 获取表名
entry.getHeader().getTableName();
// 获取Entry type: ROWDATA|TRANSACTIONBEGIN|TRANSACTIONEND|...
entry.getEntryType();
// 获取row change
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
// 获取Event type: INSERT|UPDATE|DELTE|...
rowChange.getEventType();
// 获取执行时间
long executeTimeMs = entry.getHeader().getExecuteTime();
// 获取row data
CanalEntry.RowData rowData = rowChange.getRowDatas(0);

// 获取旧值
for (CanalEntry.Column col : rowData.getBeforeColumnsList()) {
    if (col.getName().equalsIgnoreCase("accountid")) {
        oldRow.accountId = Long.parseLong(col.getValue());
    } else if (col.getName().equalsIgnoreCase("balance")) {
        oldRow.balance = Double.parseDouble(col.getValue());
    }
}

// 获取新值
for (CanalEntry.Column col : rowData.getAfterColumnsList()) {
    if (col.getName().equalsIgnoreCase("accountid")) {
        newRow.accountId = Long.parseLong(col.getValue());
    } else if (col.getName().equalsIgnoreCase("balance")) {
        newRow.balance = Double.parseDouble(col.getValue());
    }
}

// 创建transaction
new Transaction(newRow.accountId, executeTimeMs, Math.abs(newRow.balance-oldRow.balance));

3.4.3 FraudDetector.processElement

处理每一个Transaction对象,识别同一帐号可能存在的欺诈交易。更多细节请参考https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/try-flink/datastream/

4. 总结

以上示例展示了QDecoder在Flink流式计算中的基本用法:

使用QDecoder实时地获取oracle的变更,并将变更写入kafka;然后由Flink从kafka获取交易详情,进行实时计算,及时输出计算结果。

对于拥有大量oracle交易数据库的实时交易系统来讲,使用QDecoder快速获取数据变更,为后端的大数据计算提供了高效、稳定的数据流。
QDecoder简单易用的特性,以及对数据库和应用都无侵入的设计,非常易于搭建基于oracle数据源的大数据分析系统。

你好,疑问请教

目前我使用服务器 A 运行 o2k 启动提示 parser 1 不存在 parser.log 如下


ERROR :2023-03-10 11:09:29 : file=AsmFetch.cpp,line=243,func=ConnectAsm : Failed to connect source database: sys@(DESCRIPTION= (ENABLE=BROKEN)(LOAD_BALANCE=OFF) (FAILOVER=ON) (ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=192.168.0.144)(PORT=1521)))(CONNECT_DATA= (SERVICE_NAME=+ASM2) (SERVER=DEDICATED) (FAILOVER_MODE= (TYPE=SELECT) (METHOD=BASIC) (RETRIES=60) (DELAY=5))))

ERROR :2023-03-10 11:09:29 : file=OraParser.cpp,line=1010,func=Init : init CAsmFetch: 0 error.
ERROR :2023-03-10 11:09:29 : file=OraParser.cpp,line=1426,func=main : parser init error, exit !!!
INFO  :2023-03-10 11:09:29 : file=OraParser.cpp,line=1427,func=main : ========== parser(pid: 178) is stop. ==========

因为我的 ASM 还有点问题,所以我查看 wiki 发现有一条解释

若oracle的在线日志或归档日志存储于本地文件系统,则parser必须部署到oracle节点上才能读取日志文件。

问题:
请问这个 parser 组件如何部署到 oracle 节点上,是在该 oracle 节点运行 o2k 并暴露 9192 端口吗?如果是的话那服务器A运行的o2k需要修改哪些配置?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.