数据库pipeline用法(pipeline是什么意思)

目标通过提供初始输入并传递处理后的输出以供下一阶段使用,从而允许在一系列阶段中进行数据处理。解释Pipeline模式为管道模式,也称为流水线模式。通过预先设定好的一系列的阶段来处理输入的数据,每个阶段的输出即是下一个阶段的输入。模型

目标

通过提供初始输入并传递处理后的输出以供下一阶段使用,从而允许在一系列阶段中进行数据处理。

解释

Pipeline模式为管道模式,也称为流水线模式。通过预先设定好的一系列的阶段来处理输入的数据,每个阶段的输出即是下一个阶段的输入。

模型图如下:

数据库pipeline用法(pipeline是什么意思)

pipeline模式

从图中可以看出,整个流水线内数据流转是从上游到下游,上游的输出是下游的输入,按阶段依次执行。

Source: 表示数据来源,比如:KafkaSource。

Channel:表示对数据进行处理的组件,比如:JsonChannel,对数据进行json转换和处理。

Sink:表示数据落地或下沉的地方,比如:KafkaSink,表示数据发送到指定的kafka;DbSInk表示数据落地到DB。

可以看出,Pipeline是由Source(必须有),Channel(不一定需要),Sink(必须有)三种类型的组件自由组合而成的。

代码示例

/**
* 生命周期
*/
public interface LifeCycle {
/**
* 初始化
* @param config
*/
void init(String config);
/**
* 启动
*/
void startup();
/**
* 结束
*/
void shutdown();
}

/**
* 组件
*/
public interface Component<T> extends LifeCycle {
/**
* 组件名称
* @return
*/
String getName();
/**
* 获取下游组件
* @return
*/
Collection<Component> getDownStrems();
/**
* 执行
*/
void execute(T o);
}

/**
* 组件抽象实现
* @param <T> 输入
* @param <R> 输出
*/
public abstract class AbstractComponent<T, R> implements Component<T>{
@Override
public void execute(T o) {
// 当前组件执行
R r = doExecute(o);
System.out.println(getName() + \" receive \" + o + \" return \" + r);
// 获取下游组件,并执行
Collection<Component> downStreams = getDownStrems();
if (!CollectionUtils.isEmpty(downStreams)) {
downStreams.forEach(c -> c.execute(r));
}
}
/**
* 具体组件执行处理
* @param o 传入的数据
* @return
*/
protected abstract R doExecute(T o);
@Override
public void startup() {
// 下游 -> 上游 依次启动
Collection<Component> downStreams = getDownStrems();
if (!CollectionUtils.isEmpty(downStreams)) {
downStreams.forEach(Component::startup);
}
// do startup
System.out.println(\"--------- \" + getName() + \" is start --------- \");
}
@Override
public void shutdown() {
// 上游 -> 下游 依次关闭
// do shutdown
System.out.println(\"--------- \" + getName() + \" is shutdown --------- \");
Collection<Component> downStreams = getDownStrems();
if (!CollectionUtils.isEmpty(downStreams)) {
downStreams.forEach(Component::shutdown);
}
}
}

/**
* 数据来源
*/
public abstract class Source<T, R> extends AbstractComponent<T, R>{
}

/**
* 数据处理
*/
public abstract class Channel<T, R> extends AbstractComponent<T, R> {
}

/**
* 数据落地/下沉
*/
public abstract class Sink<T, R> extends AbstractComponent<T, R> {
}

上面我们封装了基本的组件实现,下面扩展一下具体的实现,用一个简单的例子说明:

IntegerSource -> IncrChannel -> StringChannel -> ConsoleSink

从上面组件名称和方向可以判断出来我们要做的流水线是什么,大概过程如:

输入一个数字 -> 数字+1 -> 转为字符串 -> 控制台输出

那么我们开始来实现这个过程吧。

/**
* 来源
*/
public class IntegerSource extends Source<Integer, Integer> {
private int val = 0;
@Override
protected Integer doExecute(Integer o) {
return o;
}
@Override
public void init(String config) {
System.out.println(\"--------- \" + getName() + \" init --------- \");
val = 1;
}
@Override
public void startup() {
super.startup();
execute(val);
}
@Override
public String getName() {
return \"Integer-Source\";
}
@Override
public Collection<Component> getDownStrems() {
return Collections.singletonList(new IncrChannel());
}
}

/**
* 处理:数字+1
*/
public class IncrChannel extends Channel<Integer, Integer> {
@Override
protected Integer doExecute(Integer o) {
return o + 1;
}
@Override
public String getName() {
return \"Incr-Channel\";
}
@Override
public Collection<Component> getDownStrems() {
return Collections.singletonList(new StringChannel());
}
@Override
public void init(String config) {
}
}

/**
* 处理:转为字符串
*/
public class StringChannel extends Channel<Integer, String> {
@Override
protected String doExecute(Integer o) {
return \"str\" + o;
}
@Override
public String getName() {
return \"String-Channel\";
}
@Override
public Collection<Component> getDownStrems() {
return Collections.singletonList(new ConsoleSink());
}
@Override
public void init(String config) {
}
}

/**
* 控制台
*/
public class ConsoleSink extends Sink<String, Void> {
@Override
protected Void doExecute(String o) {
return null;
}
@Override
public String getName() {
return \"Console-Sink\";
}
@Override
public Collection<Component> getDownStrems() {
return null;
}
@Override
public void init(String config) {
}
}

好了,扩展实现已完成,整个流水线基本已设置好,我们来测试一下吧

/**
* 流水线
*/
public class Pipeline implements LifeCycle{
/**
* 数据源
*/
private Source source;
public Pipeline(Source source) {
this.source = source;
}
@Override
public void init(String config) {
// 初始化
System.out.println(\"--------- Pipeline init --------- \");
source.init(null);
}
@Override
public void startup() {
// 启动
System.out.println(\"--------- Pipeline startup --------- \");
source.startup();
}
@Override
public void shutdown() {
// 结束
source.shutdown();
System.out.println(\"--------- Pipeline shutdown --------- \");
}
}

Pipeline pipeline = new Pipeline(new IntegerSource());
pipeline.init(null);
pipeline.startup();
pipeline.shutdown();

执行后结果如下:

——— Pipeline init ———

——— Integer-Source init ———

——— Pipeline startup ———

——— Console-Sink is start ———

——— String-Channel is start ———

——— Incr-Channel is start ———

——— Integer-Source is start ———

Integer-Source receive 1 return 1

Incr-Channel receive 1 return 2

String-Channel receive 2 return str2

Console-Sink receive str2 return null

——— Integer-Source is shutdown ———

——— Incr-Channel is shutdown ———

——— String-Channel is shutdown ———

——— Console-Sink is shutdown ———

——— Pipeline shutdown ———

总结

本文我们介绍了常见的设计模式之Pipeline模式,并通过简单的代码示例说明了这种模式的实现及目标。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,请发送邮件至 55@qq.com 举报,一经查实,本站将立刻删除。转转请注明出处:https://www.szhjjp.com/n/62161.html

(0)
nan
上一篇 2021-12-12
下一篇 2021-12-12

相关推荐

  • flash如何做透明背景(flash导入透明背景图片)

    flash如何做透明背景,flash导入透明背景图片 内容导航: 如何将flash变成透明的背景 怎么用FLASH制作透明背景的文字效果 制作FLASH的时候如何让背景透明 在fl…

    2022-08-13
    0
  • 「虚拟主机通过什么」虚拟主机是干嘛的

    虚拟主机通过什么,虚拟主机是干嘛的 内容导航: 虚拟主机技术通过什么来架设网站 虚拟机的机器码与主机的机器吗是否一样 虚拟主机有什么功能 虚拟主机使用详细步骤是什么 一、虚拟主机技…

    2022-05-18
    0
  • 企业icp备案是什么意思(icp备案是什么意思什么是ICP备案)

    企业icp备案是什么意思,icp备案是什么意思什么是ICP备案内容导航:什么是ICP备案什么是icp备案广州icp经营性备案是什么意思企业更名后网站的icp备案需要更改吗一、什么是ICP备案ICP备案:是指网站的备案,这个备案是提交到通信管理局的,最后拿

    2022-04-29
    0
  • dreamwaver如何让两个图片挨着(word如何把两张图片合在一起)

    dreamwaver如何让两个图片挨着,word如何把两张图片合在一起 内容导航: Dreamweaver 8如何让两张图片紧挨 用dreamwaver怎么在已添加的图片后面加入文…

    2022-08-20
    0
  • iis如何重定向(iis重定向次数过多)

    iis如何重定向,iis重定向次数过多内容导航:如何通过IIS设置301重定向win7iis部署网站怎么配置HTTP重定向iis怎么做301重定向windows2012iis301重定向怎么做一、如何通过IIS设置301重定向1、IIS下的301设置在Intern

    2022-04-23
    0
  • hi畅享60pro是华为手机吗

    不少用户对于hi畅享60pro这款手机还是很疑惑的,从命名上来看这款手机和华为的畅享系列十分的接近,而又支持鸿蒙系统,不过这款手机并不是华为手机。hi畅享60pro是华为手机吗答:不是华为手机。 这款手机是法国品牌WIKO旗下的HI畅享系列,不过手机是支持鸿蒙系统的,而且也可以体验华为生态产品互联体验。hi畅享60pro手机介绍1、Hi畅享60Pro于7月10日发布,正面搭载6.67英寸LCD直屏

    2024-02-09
    0

发表回复

登录后才能评论