Flink与Spring生态完美融合的脚手架工程

还在为开发Flink流处理应用程序时无法像开发Spring Boot程序那么优雅的分层以及装配Bean而烦恼吗?

你可能面临如下苦恼:

1. 开发的Flink流处理应用程序,业务逻辑全部写在Flink的操作符中,代码无法复用,无法分层

2. 要是有一天它可以像开发Spring Boot程序那样可以优雅的分层,优雅的装配Bean,不需要自己new对象好了

3. 可以使用各种Spring生态的框架,一些琐碎的逻辑不再硬编码到代码中。

GitHub最近超火的一款开源框架,Flink-Boot脚手架,该脚手架简直是Spring开发工程师的福音,完美融合Spring生态体系,再也不需要手动在Java类中创建臃肿的Java对象,简直是开发大型流处理应用程序的必不可少的工具。[Flink-Boot 脚手架由《深入理解Flink核心设计与实践原理》作者开发。仓库地址:github搜索Flink-Boot即可获得脚手架工程。


懒松鼠:Flink与Spring生态完美融合的脚手架工程


接口缓存

你的现状

static Map<String,String> cache=new HashMap<String,String>(); public String findUUID(FlowData flowData) {     String value=cache.get(flowData.getSubTestItem());     if(value==null)     {         String uuid=userMapper.findUUID(flowData);         cache.put(uuid,value);         return uuid;     }     return value; }

你想要的是这样

@Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem") public String findUUID(FlowData flowData) {     return userMapper.findUUID(flowData); }

重试机制

你的现状

public void insertFlow(FlowData flowData) {     try{         userMapper.insertFlow(flowData);       }Cache(Exception e)       {          Thread.sleep(10000);          userMapper.insertFlow(flowData);       } } 

你想要的是这样

    @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))     @Override     public void insertFlow(FlowData flowData) {         userMapper.insertFlow(flowData);     }

Bean校验

你的现状

if(flowData.getSubTestItem().length()<2&&flowData.getSubTestItem().length()>7) {     return null; } if(flowData.getBillNumber()==null) {     return null; }

你想要的是这样

Map<String, StringBuffer> validate = ValidatorUtil.validate(flowData); if (validate != null) {     System.out.println(validate);     return null; } public class FlowData {     private String uuid;     //声明该参数的校验规则字符串长度必须在7到20之间     @Size(min = 7, max = 20, message = "长度必须在{min}-{max}之间")     private String subTestItem;     //声明该参数的校验规则字符串不能为空     @NotBlank(message = "billNumber不能为空")     private String billNumber; }

等等......

GitHub最近超火的一款开源框架,懒松鼠Flink-Boot脚手架,该脚手架简直是Spring开发工程师的福音,完美融合Spring生态体系,再也不需要手动在Java类中创建臃肿的Java对象,简直是开发大型流处理应用程序的必不可少的工具。[懒松鼠Flink-Boot 脚手架由《深入理解Flink核心设计与实践原理》作者开发。](intsmaze/flink-boot)

它为流计算开发工程师解决了

1. 将所有对象的创建和依赖关系的维护工作都交给Spring容器的管理,降低了对象之间的耦合性,使代码变得更简洁,拒绝臃肿。

2. 消除在工程中对单例的过多使用。

3. 声明式事务处理,通过配置就可以完成对事物的管理,而无须手动编程。

4. 声明式注解,可以通过注解定义方法的缓冲功能,无序手动编程。

5. 注解式定义Bean对象的校验规则,通过注解即可完成对对象的参数校验,无序手动编程。

6. 集成MyBatis ORM框架,注解式维护实例对象的依赖关系。

7. 解耦Flink SQL,SQL语句剥离出JAVA文件,以简洁的模式表现在XML文件中。

8. 封装Flink API,仅提供业务方法去编写,Spring生态融合全部搞定,无需操心。

有了它你的代码就像这样子:

/** * github地址: https://github.com/intsmaze * 博客地址:https://www.cnblogs.com/intsmaze/ * 出版书籍《深入理解Flink核心设计与实践原理》 随书代码 * RichFlatMapFunction为Flink框架的一个通用型操作符(算子),开发者一般在该算子的flatMap方法中编写业务逻辑 * @auther: intsmaze(刘洋) * @date: 2020/10/15 18:33 */ public class MybatisFlatMap extends RichFlatMapFunction<String, String> {     private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();      protected ApplicationContext beanFactory;     //mybatis的Service对象,操作数据库的user表     private UserService userService;      @Override     public void open(Configuration parameters) {         ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()                 .getExecutionConfig().getGlobalJobParameters();         beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters);                  userService = beanFactory.getBean(UserServiceImpl.class);     }      @Override     public void flatMap(String value, Collector<String> out){          FlowData flowData = gson.fromJson(message, new TypeToken<FlowData>() {         }.getType());         Map<String, StringBuffer> validate = ValidatorUtil.validate(flowData);         if (validate != null) {             System.out.println(validate);             return null;         }         //数据库查询,屏蔽掉获取数据库连接,是否数据库连接,事务的声明等         String flowUUID = userService.findUUID(flowData);         if (StringUtils.isBlank(flowUUID)) {             flowUUID = UUID.randomUUID().toString();             flowData.setUuid(flowUUID);             //数据库插入,屏蔽掉获取数据库连接,是否数据库连接,事务的声明等             userService.insertFlow(flowData);         }         out.collect(gson.toJson(flowData));     } }   public interface UserService {      String findUUID(FlowData flowData);      void insertFlow(FlowData flowData); }  //通过注解实例化Bean对象。 @Service //通过注解声明进行事务管理 @Transactional //通过注解声明方法具有异常重试机制 @EnableRetry public class UserServiceImpl implements UserService {    //通过注解进行依赖注入     @Resource     private UserMapper userMapper;      @Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem")     @Override     public String findUUID(FlowData flowData) {         return userMapper.findUUID(flowData);     }         //通过注解声明该方法异常后的重试机制,无需手动编程     @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))     @Override     public void insertFlow(FlowData flowData) {         userMapper.insertFlow(flowData);     } }  public interface UserMapper {      String findUUID(FlowData flowData);      void insertFlow(FlowData flowData); }  //注解式声明参数校验规则 public class FlowData {      private String uuid;     //声明该参数的校验规则字符串长度必须在7到20之间     @Size(min = 7, max = 20, message = "长度必须在{min}-{max}之间")     private String subTestItem;     //声明该参数的校验规则字符串不能为空     @NotBlank(message = "billNumber不能为空")     private String billNumber;      @NotBlank(message = "barcode不能为空")     private String barcode;      private String flowName;      private String flowStatus;      ......

[懒松鼠Flink-Boot],仓库地址:github搜索Flink-Boot

1. 该脚手架屏蔽掉组装Flink API细节,让跨界变得简单,使得开发者能以传统Java WEB模式的开发方式开发出具备分布式计算能力的流处理程序。

2. 开发者完全不需要理解分布式计算的理论知识和Flink框架的细节,便可以快速编写业务代码实现。

3. 为了进一步提升开发者使用该脚手架开发大型项目的敏捷的程度,该脚手架工程默认集成Spring框架进行Bean管理,同时将微服务以及WEB开发领域中经常用到的框架集成进来,进一步提升开发速度。

4. 除此之外针对目前流行的各大Java框架,该Flink脚手架工程也进行了集成,加快开发人员的编码速度,比如:

* 集成Jbcp-template对Mysql,Oracle,SQLServer等关系型数据库的快速访问。

* 集成Hibernate Validator框架进行参数校验。

* 集成Spring Retry框架进行重试标志。

* 集成Mybatis框架,提高对关系型数据库增,删,改,查的开发速度。

* 集成Spring Cache框架,实现注解式定义方法缓存。

* ......

您可能还会对下面的文章感兴趣: