Flink快速入门教程:从零开始掌握大数据流处理技术

2026-02-08 0 492

初涉Flink领域,你是否觉得安装、设置、编写代码的过程让人头疼?许多人遇到这些入门难题,但实际上,只要掌握了正确的方法,就能轻松入门。

Flink基础回顾

 aliMaven
 aliyun Maven
 http://Maven.aliyun.com/nexus/content/groups/public/
 central 

Flink有其独特的基本原理、架构和众多组成部分。这些构成了掌握Flink的关键。比如,掌握其架构有助于理解数据的流动和处理过程。有位朋友在学习Flink时,跳过基础原理直接尝试案例,结果遇到了不少难题。只有对基础知识有深入理解,后续操作才会更加得心应手。在真实的开发环境中,许多团队都要求开发者先掌握Flink的基础知识。

在大数据领域,Flink的相关知识同样至关重要。众多大数据企业在招聘过程中,往往会涉及这一话题。熟练掌握Flink,有助于求职者在竞争激烈的市场中脱颖而出。

 
 org.apache.flink 
 flink-java 
 1.6.1 
provided 
 
 
 org.apache.flink 
 flink-streaming-java_2.11 
 1.6.1 
 provided 

选择开发语言

 
 org.apache.flink 
 flink-scala_2.11 
 1.6.1 
provided 
 
 
 org.apache.flink 
 flink-streaming-scala_2.11 
 1.6.1 
provided 

在编写Flink程序时,可以选择Java或Scala。Scala在实现函数式编程时显得更为精炼。我的一位同事起初用Java编写Flink程序,代码冗长,维护起来颇为不易。后来他转而使用Scala,程序状况得到了显著提升。尽管如此,Java也有其独特优势,比如众多开发者对Java更为熟悉。开发语言的选择受到多种因素影响,包括开发环境和个人偏好。在有些项目里,团队成员普遍擅长Java,那么选用Java来开发Flink程序也是合情合理的。

配置国内镜像与依赖管理

Flink快速入门教程:从零开始掌握大数据流处理技术

使用阿里云的Maven仓库镜像,需要调整.conf/.xml文件。在Maven中管理依赖项,必须先完成配置。我遇到一个项目,由于Maven仓库镜像配置不当,依赖项下载变得极其缓慢。同样,在Maven项目的pom.xml文件中也需要进行配置。不同版本需要去Maven仓库寻找相应的配置。例如,之前的一个小项目使用的是旧版本,配置过程颇为复杂。

不同语言的配置差异

package xuwei.tech.streaming;
import org.apache.Flink.api.common.functions.FlatMapFunction;
import org.apache.Flink.api.Java.utils.ParameterTool;
import org.apache.Flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.Flink.runtime.state.filesystem.FsStateBackend;
import org.apache.Flink.runtime.state.memory.MemoryStateBackend;
import org.apache.Flink.streaming.api.DataStream.DataStream;
import org.apache.Flink.streaming.api.DataStream.DataStreamSource;
import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.Flink.streaming.api.windowing.time.Time;
import org.apache.Flink.util.Collector;
/**
 * 单词计数之滑动窗口计算
 *
 * Created by xuwei.tech
 */
public class SocketWindowWordCountJava {
 public static void main(String[] args) throws Exception{
 //获取需要的端口号
 int port;
 try {
 ParameterTool parameterTool = ParameterTool.fromArgs(args);
 port = parameterTool.getInt(\"port\");
 }catch (Exception e){
 System.err.println(\"No port set. use default port 9000--Java\");
 port = 9000;
 }
 //获取Flink的运行环境
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 String hostname = \"hadoop100\";
 String delimiter = \"n\";
 //连接Socket获取输入的数据
 DataStreamSource text = env.socketTextStream(hostname, port, delimiter);
 // a a c
 // a 1
 // a 1
 // c 1
 DataStream windowCounts = text.flatMap(new FlatMapFunction 
() {
 public void flatMap(String value, Collector out) throws 
Exception {
 String[] splits = value.split(\"\\s\");
 for (String word : splits) {
 out.collect(new WordWithCount(word, 1L));
 }
 }
 }).keyBy(\"word\")
 .timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2s,指定时间间隔为1s
 .sum(\"count\");//在这里使用sum或者reduce都可以
 /*.reduce(new ReduceFunction() {
 public WordWithCount reduce(WordWithCount a, 
WordWithCount b) throws Exception {
 return new WordWithCount(a.word,a.count+b.count);
 }
 })*/
 //把数据打印到控制台并且设置并行度
 windowCounts.print().setParallelism(1);
 //这一行代码一定要实现,否则程序不执行
 env.execute(\"Socket window count\");
 }
 public static class WordWithCount{
 public String word;
 public long count;
 public WordWithCount(){}
 public WordWithCount(String word,long count){
 this.word = word;
 this.count = count;
 }
 @Override
 public String toString() {
 return \"WordWithCount{\" +
 \"word=\'\" + word + \'\'\' +
 \", count=\" + count +
 \'}\';
 }
 }
}

在Java编程中需要加入特定的Java配置,Scala编程同样需要相应的配置。我之前曾将这两种语言的配置搞混。一旦配置错误,程序便可能无法正常运作。此外,不同版本的配置要求各异,有时为了与旧版本兼容,还需付出额外努力。

在开发工具中运行和打包的要点

package xuwei.tech.streaming
import org.apache.Flink.api.Java.utils.ParameterTool
import org.apache.Flink.streaming.api.Scala.StreamExecutionEnvironment
import org.apache.Flink.streaming.api.windowing.time.Time
/**
 * 单词计数之滑动窗口计算
 *
 * Created by xuwei.tech
 */
object SocketWindowWordCountScala {
 def main(args: Array[String]): Unit = {
 //获取Socket端口号
 val port: Int = try {
 ParameterTool.fromArgs(args).getInt(\"port\")
 }catch {
 case e: Exception => {
 System.err.println(\"No port set. use default port 9000--Scala\")
 }
 9000
 }
 //获取运行环境
 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
 //连接Socket获取输入数据
 val text = env.socketTextStream(\"hadoop100\",port,\'n\')
 //解析数据(把数据打平),分组,窗口计算,并且聚合求sum
 //注意:必须要添加这一行隐式转行,否则下面的FlatMap方法执行会报错
 import org.apache.Flink.api.Scala._
 val windowCounts = text.flatMap(line => line.split(\"\\s\"))//打平,把每一行单词都切开
 .map(w => WordWithCount(w,1))//把单词转成word , 1这种形式
 .keyBy(\"word\")//分组
 .timeWindow(Time.seconds(2),Time.seconds(1))//指定窗口大小,指定间隔时间
 .sum(\"count\");// sum或者reduce都可以
 //.reduce((a,b)=>WordWithCount(a.word,a.count+b.count))
 //打印到控制台
 windowCounts.print().setParallelism(1);
 //执行任务
 env.execute(\"Socket window count\");
 }
 case class WordWithCount(word: String,count: Long)
}

[root@hadoop100 soft]# nc -l 9000
a
b
a

使用IDEA等开发环境执行代码时,记得将依赖配置中的scope属性注释掉。而在制作JAR包时,则需确保这一属性被启用。举例来说,若之前某个项目在打包时遗漏了这一步骤,结果生成的JAR包体积庞大,还附带了许多不必要的依赖。Flink具备延迟计算的特性,只有调用特定方法才会启动执行。在调试过程中,我为此特性耗费了不少精力,不过它也确实简化了复杂程序的编写。

案例需求分析与数据处理

WordWithCount{word=\'a\', count=1}
WordWithCount{word=\'b\', count=1}
WordWithCount{word=\'a\', count=2}
WordWithCount{word=\'b\', count=1}
WordWithCount{word=\'a\', count=1}

手工生成单词,Flink实时获取数据,并在特定窗口内进行汇总计算。需要为多种语言添加Maven依赖。之前曾遇到过类似需求,仅配置依赖就耗费了大量时间。按照要求完成依赖添加后,在IDEA中执行代码即可获得结果。此外,之后还可以尝试使用Flink的Batch离线批处理功能。

package xuwei.tech.batch;
import org.apache.Flink.api.common.functions.FlatMapFunction;
import org.apache.Flink.api.Java.DataSet;
import org.apache.Flink.api.Java.ExecutionEnvironment;
import org.apache.Flink.api.Java.operators.DataSource;
import org.apache.Flink.api.Java.tuple.Tuple2;
import org.apache.Flink.util.Collector;
/**
 *单词计数之离线计算
 *
 * Created by xuwei.tech
 */
public class BatchWordCountJava {
 public static void main(String[] args) throws Exception{
 String inputPath = \"D:\\data\\file\";
 String outPath = \"D:\\data\\result\";
 //获取运行环境
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 //获取文件中的内容
 DataSource text = env.readTextFile(inputPath);
 DataSet<Tuple2> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
 counts.writeAsCsv(outPath,\"n\",\" \").setParallelism(1);
 env.execute(\"batch word count\");
 }
 public static class Tokenizer implements FlatMapFunction<String,Tuple2>{
 public void flatMap(String value, Collector<Tuple2> out) 
throws Exception {
 String[] tokens = value.toLowerCase().split(\"\\W+\");
 for (String token: tokens) {
 if(token.length()>0){
 out.collect(new Tuple2(token,1));
 }
 }
 }
 }
}

最后我想请教各位,在学习Flink的过程中,大家觉得哪一部分最让人感到棘手?期待大家的点赞和转发。

package xuwei.tech.batch
import org.apache.Flink.api.Scala.ExecutionEnvironment
/**
 * 单词计数之离线计算
 * Created by xuwei.tech
 */
object BatchWordCountScala {
 def main(args: Array[String]): Unit = {
 val inputPath = \"D:\\data\\file\"
 val outPut = \"D:\\data\\result\"
 val env = ExecutionEnvironment.getExecutionEnvironment
 val text = env.readTextFile(inputPath)
 //引入隐式转换
 import org.apache.Flink.api.Scala._
 val counts = text.flatMap(_.toLowerCase.split(\"\\W+\"))
 .filter(_.nonEmpty)
 .map((_,1))
 .groupBy(0)
 .sum(1)
 counts.writeAsCsv(outPut,\"n\",\" \").setParallelism(1)
 env.execute(\"batch word count\")
 }
}

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

申明:本文由第三方发布,内容仅代表作者观点,与本网站无关。对本文以及其中全部或者部分内容的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。本网发布或转载文章出于传递更多信息之目的,并不意味着赞同其观点或证实其描述,也不代表本网对其真实性负责。

左子网 开发教程 Flink快速入门教程:从零开始掌握大数据流处理技术 https://www.zuozi.net/79386.html

常见问题
  • 1、自动:拍下后,点击(下载)链接即可下载;2、手动:拍下后,联系卖家发放即可或者联系官方找开发者发货。
查看详情
  • 1、源码默认交易周期:手动发货商品为1-3天,并且用户付款金额将会进入平台担保直到交易完成或者3-7天即可发放,如遇纠纷无限期延长收款金额直至纠纷解决或者退款!;
查看详情
  • 1、描述:源码描述(含标题)与实际源码不一致的(例:货不对板); 2、演示:有演示站时,与实际源码小于95%一致的(但描述中有”不保证完全一样、有变化的可能性”类似显著声明的除外); 3、发货:不发货可无理由退款; 4、安装:免费提供安装服务的源码但卖家不履行的; 5、收费:价格虚标,额外收取其他费用的(但描述中有显著声明或双方交易前有商定的除外); 6、其他:如质量方面的硬性常规问题BUG等。 注:经核实符合上述任一,均支持退款,但卖家予以积极解决问题则除外。
查看详情
  • 1、左子会对双方交易的过程及交易商品的快照进行永久存档,以确保交易的真实、有效、安全! 2、左子无法对如“永久包更新”、“永久技术支持”等类似交易之后的商家承诺做担保,请买家自行鉴别; 3、在源码同时有网站演示与图片演示,且站演与图演不一致时,默认按图演作为纠纷评判依据(特别声明或有商定除外); 4、在没有”无任何正当退款依据”的前提下,商品写有”一旦售出,概不支持退款”等类似的声明,视为无效声明; 5、在未拍下前,双方在QQ上所商定的交易内容,亦可成为纠纷评判依据(商定与描述冲突时,商定为准); 6、因聊天记录可作为纠纷评判依据,故双方联系时,只与对方在左子上所留的QQ、手机号沟通,以防对方不承认自我承诺。 7、虽然交易产生纠纷的几率很小,但一定要保留如聊天记录、手机短信等这样的重要信息,以防产生纠纷时便于左子介入快速处理。
查看详情

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务