点亮 ⭐️ Star · 照亮开源之路
GitHub:https://github.com/apache/incubator-seatunnel
无论是甲方还是乙方,我们在采集数据进行数仓模型建设时,企业的ERP一旦切换到SAP系统中,就会遇到较高的安全挑战、技术门槛和产品壁垒。
**安全挑战问题在于:**传统数仓模式离线接入SAP HANA,对于多集团公司又涉及到数据权限和隔离等安全问题,一般集团大型企业不太会开放HANA数据库进行接入,同时SAP的业务表逻辑也比较复杂;
**技术门槛在于:**我们要有对应的java开发工程师,每一个数据表就要开发一个接口,而且接口的传输速度也很慢,只能适合小批量的数据接入;
产品壁垒在于:SAP的闭环管理只能购入SAP的BW产品实现整体数据的快速接入和模型建设,这种模式就比较适合 “ALL IN SAP ”的企业里面,所有的数据处理和分析都是基于SAP产品进行闭环的开发管理,但是弊端依然明显,一旦有部分产品脱离SAP,那数据团队以及运维的成本都是翻倍增加 的,也无法实现企业降本增效的目的;
实际情况是在企业内部的各种业务系统异常复杂,尤其是各种各样的ERP系统,业务中台系统,线上平台系统,私有化部署的,SAAS模式的,要一个通用的工具去实现各种数据源的采集接入 ,前几年国内比较主流的就是Kettle,再后来是DATAX;但他们都侧重于离线处理,对于实时数据接入也是费时费力,或者基本不能实现;
基于以上复杂场景,在试用了市场上主流的开源的产品之后,我们锁定了SeaTunnel ,按照从简单到复杂的接入,分步骤实现了离线数据接入,实时数据(Kafka)接入,数据在Hadoop生态和Clickhouse之间的衔接打通,在验证了上述的稳定性和高速度 之后,我们内部决定开发基于SeaTunnel的SAP RFC接口,完全彻底的打通企业内部数据采集的最后一个壁垒;
首先开发BaseStaticInput插件 。BaseStaticInput是个abstract class,我们只要继承并实现它就可以。class SapRfcInput extends BaseStaticInput{
var config = ConfigFactory.empty()
override def setConfig(config: Config): Unit = {
this.config = config
}
override def getConfig(): Config = config
override def checkConfig(): (Boolean, String) = {
}
override def getDataset(spark: SparkSession): Dataset[Row] = {
}
}
其中的关键点就是要实现getDataSet函数 ,这个函数的返回值是Dataset[Row] 。
**怎么才能得到Dataset[Row]?**要么直接通过seq或者list类似的数据构造,要么通过RDD构造。
如果直接通过数据构造,在数据量过大时会产生内存溢出,这种方法在数据量很小的时候是可以的。在数据量大的时候,需要一种惰性的方式获取数据,得实现自己的RDD。class SapRfcRDD(sc: SparkContext, config: Config) extends RDD[Row](sc, Nil) with Logging{
override def compute(split: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] {
override def hasNext: Boolean = {
}
override def next(): Row = {
}
}
override protected def getPartitions: Array[Partition] = {
}
}
SapRfcRDD 构造函数我们自己添加了一个参数config,为什么添加它?下文会说明。
compute 顾名思义就是用于计算出数据的,返回值是迭代器,说明它是一种惰性的获取数据的方式。
实现一个迭代器在此之前首先需要实现hasNext与next方法 ,hasNext用于辨别是否还有数据, next用于产生数据。
getPartitions用于获得分区信息。
如何实现这两个函数呢? 这个实现就得和如何获得SAP RFC接口的数据结合起来看。
我们获得SAP RFC接口数据一般有以下关键步骤: # 根据相关RFC接口信息获得JcoTable
getJcoTable(config: Config): JCoTable
# 获得数据行数
table.getNumRows
# 设置行
table.setRow(curIndex)
# 根据字段名取数据
val data = columns.map(column => {
table.getString(column)
})
compute中的hasNext方法是肯定和table.getNumRows相关的 , next方法是肯定和table.setRow方法相关的, 那我们得获得JcoTable对象,这就和上面提到SapRfcRDD的构造函数的第二个参数config联系起来了,通过config,我们才能获得JcoTable对象。
**那为什么不直接通过构造函数参数将JcoTable注入呢?**这涉及到RDD是分布式数据集,它会被序列化之后在各个节点之间传递,SapRfcRDD构造函数的参数是必须能够安全序列化的,但JcoTable序列化会产生内存溢出,当然是否溢出是和JcoTable关联的数据大小有关。
**那getPartitions是干嘛的呢?**看下来好像不需要它也是可以的。如果你仅仅想把数据分成一个分区的话,getPartitons确实是没什么用的。
但是如果你要把数据分成多个分区,加快它的处理速度,getPartitions的实现就很重要了。
而且要特别注意compute的split参数,它其实就是getPartitions返回的其中一个分区,compute的hasNext与next的实现 和它是息息相关的。trait Partition extends scala.AnyRef with scala.Serializable {
def index : scala.Int
override def hashCode() : scala.Int = { /* compiled code */ }
override def equals(other : scala.Any) : scala.Boolean = { /* compiled code */ }
}
class RowPartition(idx: Int, val start: Int, val end: Int) extends Partition {
override def index: Int = idx
override def toString: String = s"RowPartition{index: ${idx}, start: ${start}, end: ${end}}"
}
getPartitions的返回值是Array[Partitions] ,Partition是一个接口,实现它是非常简单的。
我们给RowPartiton构造函数添加了两个参数start与end ,即JcoTable的开始行数与结束行数,左闭又开。
比如说整个接口的数据是2000行,我们给它分成两个分区,就类似于 RowPartition {index: 0, start: 0, end: 1000}, RowPartition {index: 0, start: 1000, end: 2000}。
在compute中split就是RowPartiton的实例, 通过split的start与end我们可以很容易的实现hasNext, next。 override def compute(split: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] {
val columns = config.getStringList(config.getString("table")).asScala
val rowPartition: RowPartition = split.asInstanceOf[RowPartition]
val table = SapRfc.getJcoTable(config)
val tableRowData = new TableRowData(columns, table, rowPartition.index, rowPartition.start, rowPartition.end)
println(tableRowData)
override def hasNext: Boolean = {
tableRowData.hasNext()
}
override def next(): Row = tableRowData.next()
}
class TableRowData(val columns: Seq[String], val table: JCoTable, val partitionId: Int, val start: Int, val end: Int) {
var curIndex = start
def hasNext(): Boolean = {
curIndex < end
}
def next(): Row = {
table.setRow(curIndex)
val data = columns.map(column => {
table.getString(column)
})
curIndex += 1
Row.fromSeq(data)
}
override def toString: String = s"TableRowData{partitionId: ${partitionId}, start: ${start}, end: ${end}, columns: ${columns} }"
}
实现完SapRfcRDD, 实现getDataSet就非常容易了。
最终我们实现了SAP RFC接口的数据接入,包含了2种模式ASHOST 和MSHOST (注:The MSHOST string is useful since it will give you failover capabilities in the process server connection. Also it can load balance the CPS connections (not the jobs, they are load balanced based on other metrics) to your remote system ),极大的简化了SAP数据的采集时间,由原来java模式的一接口一开发实现了现在的一接口一配置,附input示例:input {
org.interestinglab.waterdrop.input.SapRfc {
jco.client.mshost = "XXXXXX"
jco.client.r3name = "XXX"
jco.client.client = "XXX"
jco.client.user = "XXX"
jco.client.passwd = "XXX"
jco.client.lang = "ZH"
jco.client.group="PUBLIC"
function = "FUNXXX"
params = ["IV_DDATE", ""${rfc_date}""]
table = "TTXXX"
TTXXX= ["col1","col2","col3"]
partition = 4
result_table_name = "res_tt"
}
}
input {
org.interestinglab.waterdrop.input.SapRfc {
jco.client.ashost = "XXXX"
jco.client.sysnr = "XX"
jco.client.client = "XX"
jco.client.user = "XX"
jco.client.passwd = "XXX"
jco.client.lang = "ZH"
function = "FUNXXX"
params = ["DDATE", ""${rfc_date}""]
table = "TABLEXXXX"
TABLEXXXX = ["col1","col2","col3"]
partition = 4
result_table_name = "res_tt"
}
}
参数配置包含三部分,第一部分端口的访问信息,第二部分是sap内部的函数以及传递参数、表名称以及表字段,第三部分是partition 是spark的分区数配置;
通过上述配置,我们获取60万条左右sap数据 (受限sap控制条件只能按天查询),从启动job到数据插入hive只需要2分钟即可,整个SAP数据的接入开发时间由原来的天缩短到小时级别(包含参数配置,基本校验)。
作者:韩山峰/皇甫新义金红叶纸业集团大数据开发工程师
专注于大数据平台建设、数据仓库、数据模型建设、数据可视化方向,对市场上常见的数据集成框架以及引擎有一定的了解。
Apache SeaTunnel
// 保持联络 //
来,和社区一同成长!
Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。
仓库地址: https://github.com/apache/incubator-seatunnel
**网址:**https://seatunnel.apache.org/
**Proposal:**https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal
**Apache SeaTunnel(Incubating) 2.1.0 下载地址:**https://seatunnel.apache.org/download
衷心欢迎更多人加入!
能够进入 Apache 孵化器,SeaTunnel(原 Waterdrop) 新的路程才刚刚开始,但社区的发展壮大需要更多人的加入。我们相信,在**「Community Over Code」(社区大于代码)、 「Open and Cooperation」(开放协作)、 「Meritocracy」**(精英管理)、以及「**多样性与共识决策」**等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!
我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!
**提交问题和建议:**https://github.com/apache/incubator-seatunnel/issues
**贡献代码:**https://github.com/apache/incubator-seatunnel/pulls
订阅社区开发邮件列表 : dev-subscribe@seatunnel.apache.org
**开发邮件列表:**dev@seatunnel.apache.org
**加入 Slack:**https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ
关注 Twitter: https://twitter.com/ASFSeaTunnel