seatunnel初体验(二):数据源的使用
背景
在完成了seatunnel的环境配置后,需要熟悉上下游各种数据源。
初体验
先以spark batch为例。seatunnel上spark支持的数据源比flink多,然后spark更擅长batch处理,而不是streaming处理。
最简单的配置
Fake -> Console
一次性输入,控制台输出。
source {
Fake {
result_table_name = "my_dataset"
}
}
transform {
}
sink {
Console {}
}
结果
+------------------+
|raw_message |
+------------------+
|Hello garyelephant|
|Hello rickyhuo |
|Hello kid-xiong |
+------------------+
说明
输入源Fake:没有参数。
输出源Console:默认展示100条,文本格式。
Json格式输出
Console,使用Json格式输出。
console {
limit = 10,
serializer = "json"
}
{"raw_message":"Hello garyelephant"}
{"raw_message":"Hello rickyhuo"}
{"raw_message":"Hello kid-xiong"}
流式输入
source {
FakeStream {
content = ["name=ricky&age=23", "name=gary&age=28"]
rate = 5
}
}
会连续输出了
{"raw_message":"name=gary&age=28"}
{"raw_message":"name=ricky&age=23"}
{"raw_message":"name=gary&age=28"}
{"raw_message":"name=gary&age=28"}
{"raw_message":"name=gary&age=28"}
{"raw_message":"name=ricky&age=23"}
{"raw_message":"name=gary&age=28"}
{"raw_message":"name=ricky&age=23"}
{"raw_message":"name=gary&age=28"}
{"raw_message":"name=ricky&age=23"}
File -> Console
Spark支持本地的文件格式有:text, parquet, json, orc, csv.[3],默认为json。
source {
file {
path = "file:///Users/abeffect/IdeaProjects/seatunnel/play/data"
result_table_name = "access_log"
}
}
测试了一下,seatunnel会扫描path下的所有文件,然后依次读取每一个文件。
File -> ClickHouse
ClickHouse {
host = "localhost:8123"
database = "nginx"
table = "access_msg"
fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"]
username = "username"
password = "password"
bulk_size = 20000
retry_codes = [209, 210]
retry = 3
}
Hive -> Console
把 hive-site.xml
文件,复制到 spark/conf
目录下。
把 hdfs-site.xml
文件,复制到 spark/conf
目录下。
env {
...
spark.sql.catalogImplementation = "hive"
...
}
source {
hive {
pre_sql = "select * from mydb.mytb"
result_table_name = "myTable"
}
}