seatunnel初体验(一):安装配置
背景
seatunnel是一个数据同步的客户端。
初体验
环境准备
Spark
seatunnel支持的Spark版本为:(required version >= 2 and version < 3.x )[7]。
下载Spark:
wget https://mirrors.aliyun.com/apache/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
最新版本的3.2.1,seatunnel是不支持的。
wget https://mirrors.aliyun.com/apache/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
入门例子[4]
./bin/run-example SparkPi 10
结果
Pi is roughly 3.14019114019114
Flink
下载Flink。seatunnel支持的Flink版本为(required version >= 1.9.0 and version < 1.14.x )[7]
wget https://mirrors.aliyun.com/apache/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz
下面的1.15.0版本,seatunnel是不支持的。
wget https://mirrors.aliyun.com/apache/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz
入门例子[6], 注[6]上还有好多例子,值得去看一看。
启动集群
$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host blue.local.
Starting taskexecutor daemon on host blue.local.
提交作业
./bin/flink run examples/streaming/WordCount.jar --input README.txt
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID d1f4e9818091919a7770aedc6d3de669
Program execution finished
Job with JobID d1f4e9818091919a7770aedc6d3de669 has finished.
Job Runtime: 249 ms
通过 Flink 的 Web UI 来监视集群的状态和正在运行的作业。
查看结果
tail log/flink-*-taskexecutor-*.out
(section,1)
(740,1)
(13,1)
(for,4)
(both,1)
(object,1)
(code,1)
(and,7)
(source,1)
(code,2)
停止集群
$ ./bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 395) on host blue.local.
Stopping standalonesession daemon (pid: 99940) on host blue.local.
seatunnel准备
下载
新版本
wget https://www.apache.org/dyn/closer.lua/incubator/seatunnel/2.1.1/apache-seatunnel-incubating-2.1.1-bin.tar.gz
老版本
wget https://github.com/apache/incubator-seatunnel/releases/download/v1.5.7/seatunnel-1.5.7.zip
推荐源码编译,因为源码仅有16MB,网速不佳时也可以玩的转。
git clone https://gitee.com/mirrors/seatunnel.git
git checkout 2.1.1-release
mvn clean package -Dmaven.test.skip=true -Dtests.skip=True
编译结果
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 14:31 min
快速入门
建立一个目录,从工程目录中找到对应的文件,按下面所示放置。
├── bin
│ ├── start-seatunnel-flink.sh
│ └── start-seatunnel-spark.sh
├── config
│ ├── flink-application.conf
│ ├── flink.batch.conf.template
│ ├── flink.sql.conf.template
│ ├── flink.streaming.conf.template
│ ├── seatunnel-env.sh
│ ├── spark-application.conf
│ ├── spark.batch.conf.template
│ └── spark.streaming.conf.template
└── lib
├── seatunnel-core-flink.jar
└── seatunnel-core-spark.jar
on Spark
设置环境变量
export SPARK_HOME=/Users/abeffect/share/spark-2.4.8-bin-hadoop2.7
执行
./bin/start-seatunnel-spark.sh --master local[4] --deploy-mode client --config ./config/spark-application.conf > spark.log 2>&1
结果
+-----------+---+----+
|raw_message|msg|name|
+-----------+---+----+
+-----------+---+----+
第二个例子
$ ./bin/start-seatunnel-spark.sh --master local[4] --deploy-mode client --config ./config/spark.streaming.conf.template
会循环输出结果
+----------------------+-----+------+
|raw_message |msg |name |
+----------------------+-----+------+
|Hello World, SeaTunnel|Hello|World,|
|Hello World, SeaTunnel|Hello|World,|
|Hello World, SeaTunnel|Hello|World,|
|Hello World, SeaTunnel|Hello|World,|
|Hello World, SeaTunnel|Hello|World,|
+----------------------+-----+------+
on Flink
需要先启动flink服务
cd flink-1.13.6
bin/start-cluster.sh
./bin/start-seatunnel-flink.sh --config ./config/flink.streaming.conf.template
可以看到不断的输出
+I[Gary, 1652546771158]
+I[Ricky Huo, 1652546772162]
+I[Kid Xiong, 1652546773167]
+I[Ricky Huo, 1652546774169]
+I[Gary, 1652546775173]
+I[Kid Xiong, 1652546776175]