Flink与Spring生态完美融合的脚手架工程
还在为开发Flink流处理应用程序时无法像开发Spring Boot程序那么优雅的分层以及装配Bean而烦恼吗?
你可能面临如下苦恼:
1. 开发的Flink流处理应用程序,业务逻辑全部写在Flink的操作符中,代码无法复用,无法分层
2. 要是有一天它可以像开发Spring Boot程序那样可以优雅的分层,优雅的装配Bean,不需要自己new对象好了
3. 可以使用各种Spring生态的框架,一些琐碎的逻辑不再硬编码到代码中。
GitHub最近超火的一款开源框架,Flink-Boot脚手架,该脚手架简直是Spring开发工程师的福音,完美融合Spring生态体系,再也不需要手动在Java类中创建臃肿的Java对象,简直是开发大型流处理应用程序的必不可少的工具。[Flink-Boot 脚手架由《深入理解Flink核心设计与实践原理》作者开发。仓库地址:github搜索Flink-Boot即可获得脚手架工程。
接口缓存
你的现状
static Map<String,String> cache=new HashMap<String,String>(); public String findUUID(FlowData flowData) { String value=cache.get(flowData.getSubTestItem()); if(value==null) { String uuid=userMapper.findUUID(flowData); cache.put(uuid,value); return uuid; } return value; }
你想要的是这样
@Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem") public String findUUID(FlowData flowData) { return userMapper.findUUID(flowData); }
重试机制
你的现状
public void insertFlow(FlowData flowData) { try{ userMapper.insertFlow(flowData); }Cache(Exception e) { Thread.sleep(10000); userMapper.insertFlow(flowData); } }
你想要的是这样
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5)) @Override public void insertFlow(FlowData flowData) { userMapper.insertFlow(flowData); }
Bean校验
你的现状
if(flowData.getSubTestItem().length()<2&&flowData.getSubTestItem().length()>7) { return null; } if(flowData.getBillNumber()==null) { return null; }
你想要的是这样
Map<String, StringBuffer> validate = ValidatorUtil.validate(flowData); if (validate != null) { System.out.println(validate); return null; } public class FlowData { private String uuid; //声明该参数的校验规则字符串长度必须在7到20之间 @Size(min = 7, max = 20, message = "长度必须在{min}-{max}之间") private String subTestItem; //声明该参数的校验规则字符串不能为空 @NotBlank(message = "billNumber不能为空") private String billNumber; }
等等......
GitHub最近超火的一款开源框架,懒松鼠Flink-Boot脚手架,该脚手架简直是Spring开发工程师的福音,完美融合Spring生态体系,再也不需要手动在Java类中创建臃肿的Java对象,简直是开发大型流处理应用程序的必不可少的工具。[懒松鼠Flink-Boot 脚手架由《深入理解Flink核心设计与实践原理》作者开发。](intsmaze/flink-boot)
它为流计算开发工程师解决了
1. 将所有对象的创建和依赖关系的维护工作都交给Spring容器的管理,降低了对象之间的耦合性,使代码变得更简洁,拒绝臃肿。
2. 消除在工程中对单例的过多使用。
3. 声明式事务处理,通过配置就可以完成对事物的管理,而无须手动编程。
4. 声明式注解,可以通过注解定义方法的缓冲功能,无序手动编程。
5. 注解式定义Bean对象的校验规则,通过注解即可完成对对象的参数校验,无序手动编程。
6. 集成MyBatis ORM框架,注解式维护实例对象的依赖关系。
7. 解耦Flink SQL,SQL语句剥离出JAVA文件,以简洁的模式表现在XML文件中。
8. 封装Flink API,仅提供业务方法去编写,Spring生态融合全部搞定,无需操心。
有了它你的代码就像这样子:
/** * github地址: https://github.com/intsmaze * 博客地址:https://www.cnblogs.com/intsmaze/ * 出版书籍《深入理解Flink核心设计与实践原理》 随书代码 * RichFlatMapFunction为Flink框架的一个通用型操作符(算子),开发者一般在该算子的flatMap方法中编写业务逻辑 * @auther: intsmaze(刘洋) * @date: 2020/10/15 18:33 */ public class MybatisFlatMap extends RichFlatMapFunction<String, String> { private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); protected ApplicationContext beanFactory; //mybatis的Service对象,操作数据库的user表 private UserService userService; @Override public void open(Configuration parameters) { ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext() .getExecutionConfig().getGlobalJobParameters(); beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters); userService = beanFactory.getBean(UserServiceImpl.class); } @Override public void flatMap(String value, Collector<String> out){ FlowData flowData = gson.fromJson(message, new TypeToken<FlowData>() { }.getType()); Map<String, StringBuffer> validate = ValidatorUtil.validate(flowData); if (validate != null) { System.out.println(validate); return null; } //数据库查询,屏蔽掉获取数据库连接,是否数据库连接,事务的声明等 String flowUUID = userService.findUUID(flowData); if (StringUtils.isBlank(flowUUID)) { flowUUID = UUID.randomUUID().toString(); flowData.setUuid(flowUUID); //数据库插入,屏蔽掉获取数据库连接,是否数据库连接,事务的声明等 userService.insertFlow(flowData); } out.collect(gson.toJson(flowData)); } } public interface UserService { String findUUID(FlowData flowData); void insertFlow(FlowData flowData); } //通过注解实例化Bean对象。 @Service //通过注解声明进行事务管理 @Transactional //通过注解声明方法具有异常重试机制 @EnableRetry public class UserServiceImpl implements UserService { //通过注解进行依赖注入 @Resource private UserMapper userMapper; @Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem") @Override public String findUUID(FlowData flowData) { return userMapper.findUUID(flowData); } //通过注解声明该方法异常后的重试机制,无需手动编程 @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5)) @Override public void insertFlow(FlowData flowData) { userMapper.insertFlow(flowData); } } public interface UserMapper { String findUUID(FlowData flowData); void insertFlow(FlowData flowData); } //注解式声明参数校验规则 public class FlowData { private String uuid; //声明该参数的校验规则字符串长度必须在7到20之间 @Size(min = 7, max = 20, message = "长度必须在{min}-{max}之间") private String subTestItem; //声明该参数的校验规则字符串不能为空 @NotBlank(message = "billNumber不能为空") private String billNumber; @NotBlank(message = "barcode不能为空") private String barcode; private String flowName; private String flowStatus; ......
[懒松鼠Flink-Boot],仓库地址:github搜索Flink-Boot
1. 该脚手架屏蔽掉组装Flink API细节,让跨界变得简单,使得开发者能以传统Java WEB模式的开发方式开发出具备分布式计算能力的流处理程序。
2. 开发者完全不需要理解分布式计算的理论知识和Flink框架的细节,便可以快速编写业务代码实现。
3. 为了进一步提升开发者使用该脚手架开发大型项目的敏捷的程度,该脚手架工程默认集成Spring框架进行Bean管理,同时将微服务以及WEB开发领域中经常用到的框架集成进来,进一步提升开发速度。
4. 除此之外针对目前流行的各大Java框架,该Flink脚手架工程也进行了集成,加快开发人员的编码速度,比如:
* 集成Jbcp-template对Mysql,Oracle,SQLServer等关系型数据库的快速访问。
* 集成Hibernate Validator框架进行参数校验。
* 集成Spring Retry框架进行重试标志。
* 集成Mybatis框架,提高对关系型数据库增,删,改,查的开发速度。
* 集成Spring Cache框架,实现注解式定义方法缓存。
* ......