这篇文章主要介绍“flinksql如何链接kafka”,在日常操作中,相信很多人在flinksql如何链接kafka问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”flinksql如何链接kafka”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

目前创新互联已为成百上千的企业提供了网站建设、域名、虚拟主机、网站托管运营、企业网站设计、碌曲网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。
4.0.0 org.example flinksqldemo 1.0-SNAPSHOT UTF-8 UTF-8 2.11 2.11.8 0.10.2.1 1.12.0 2.7.3 compile org.apache.maven.plugins maven-compiler-plugin 8 8 org.apache.flink flink-table-planner-blink_2.11 1.12.0 org.apache.flink flink-java ${flink.version} ${setting.scope} org.apache.flink flink-streaming-java_2.11 ${flink.version} ${setting.scope} org.apache.flink flink-clients_2.11 ${flink.version} ${setting.scope} org.apache.flink flink-connector-kafka_2.11 1.12.0 org.apache.flink flink-csv 1.12.0 org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} ${setting.scope} org.apache.kafka kafka_${scala.binary.version} ${kafka.version} ${setting.scope} org.apache.hadoop hadoop-common ${hadoop.version} ${setting.scope} org.apache.hadoop hadoop-hdfs ${hadoop.version} ${setting.scope} org.apache.hadoop hadoop-client ${hadoop.version} ${setting.scope} org.slf4j slf4j-api 1.7.25 com.alibaba fastjson 1.2.72 redis.clients jedis 2.7.3 com.google.guava guava 29.0-jre
代码:
package com.jd.data;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
public class TableApiConnectKafka04 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1、创建表执行环节
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.connect(new Kafka()
.version("0.11") // 定义版本
.topic("xxx") // 定义主题
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
).withFormat(new Csv()).withSchema(new Schema().field("a", DataTypes.STRING()) // 定义表的结构
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())
)
.inAppendMode()
.createTemporaryTable("xxx");
Table xxx = tableEnv.from("xxx");
xxx.printSchema();
tableEnv.toAppendStream(xxx, Row.class ).print();
env.execute("job");
}
}到此,关于“flinksql如何链接kafka”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!