简单易用的任务队列-beanstalkd

概述

beanstalkd 是一个简单快速的分布式工作队列系统,协议基于 ASCII 编码运行在 TCP 上。其最初设计的目的是通过后台异步执行耗时任务的方式降低高容量 Web 应用的页面延时。其具有简单、轻量、易用等特点,也支持对任务优先级、延时/超时重发等控制,同时还有众多语言版本的客户端支持,这些优点使得它成为各种需要队列系统场景的一种常见选择。

beanstalkd 优点

  • 如他官网的介绍,simple&fast,使用非常简单,适合需要引入消息队列又不想引入 kafka 这类重型的 mq,维护成本低;同时,它的性能非常高,大部分场景下都可以 cover 住。
  • 支持持久化
  • 支持消息优先级,topic,延时消息,消息重试等
  • 主流语言客户端都支持,还可以根据 beanstalkd 协议自行实现。

beanstalkd 不足

  • 无最大内存控制,当业务消息极多时,服务可能会不稳定。
  • 官方没有提供集群故障切换方案(主从或哨兵等),需要自己解决。

beanstalkd 重点概念

  • job

任务,队列中的基本单元,每个 job 都会有 id 和优先级。有点类似其他消息队列中的 message 的概念。但 job 有各种状态,下文介绍生命周期部分会重点介绍。job 存放在 tube 中。

  • tube

管道,用来存储同一类型的 job。有点类似其他消息队列中的 topic 的概念。beanstalkd 通过 tube 来实现多任务队列,beanstalkd 中可以有多个管道,每个管道有自己的 producer 和 consumer,管道之间互相不影响。

  • producer

job 生产者。通过 put 命令将一个 job 放入到一个 tube 中。

  • consumer

job 消费者。通过 reserve 来获取 job,通过 delete、release、bury 来改变 job 的状态。

beanstalkd 生命周期

上文介绍到,beanstalkd 中 job 有状态区分,在整个生命周期中,job 可能有四种状态: READY, RESERVED, DELAYED, BURIED。只有处于 READY状态的 job 才能被消费。下图介绍了各状态之间的流转情况。

简单易用的任务队列-beanstalkd

producer 在创建 job 的时候有两种方式,put 和 put with delay(延时任务)。
如果 producer 使用 put 直接创建一个 job 时,该 job 就处于 READY 状态,等待 consumer 处理。
如果 producer 使用 put with delay 方式创建 job,该 job 的初始状态为 DELAYED 状态,等待延迟时间过后才变更为 READY 状态。
以上两种方式创建的 job 都会传入一个 TTR(超时机制),当 job 处于 RESERVED 状态时,TTR 开始倒计时,当 TTR 倒计时完,job 状态还没有改变,则会认为该 job 处理失败,会被重新放回到队列中。

consumer 获取到(reserve)一个 READY 状态的 job 之后,该 job 的状态就会变更为 RESERVED。此时,其他的 consumer 就不能再操作该 job 了。当 consumer 完成该 job 之后,可以选择 delete,release,或 bury 操作。

  • delete ,job 被删除,从 beanstalkd 中清除,以后也无法再获取到,生命周期结束。
  • release ,可以把该 job 重新变更为 READY 状态,使得其他的 consumer 可以继续获取和执行该 job,也可以使用 release with delay 延时操作,这样会先进入 DELAYED 状态,延迟时间到达后再变为 READY。
  • bury,可以将 job 休眠,等需要的时候,在将休眠的 job 通过 kick 命令变更回 READY 状态,也可以通过 delete 直接删除 BURIED 状态的 job 。

处于 BURIED 状态的 job,可以通过 kick 重回 READY 状态,也可以通过 delete 删除 job。

为什么设计这个 BURIED 状态呢?
一般我们可以用这个状态来做异常捕获,例如执行超时或者异常的 job,我们可以将其置为 BURIED 状态,这样做有几个好处:
1.可以便面这些异常的 job 直接被放回队列重试,影响正常的队列消费(这些失败一次的 job,很有可能再次失败)。如果没有这个 BURIED 状态,如果我们要单独隔离,一般我们会使用一个新的 tube 单独存放这些异常的 job,使用单独的 consumer 消费。这样就不会影响正常的新消息消费。特别是失败率比较高的时候,会占用很多的正常资源。
2.便于人工排查,上面已经讲到,可以将异常的 job 置为 BURIED 状态,这样人工排查时重点关注这个状态就可以了。

beanstalkd 特性

持久化

通过 binlog 将 job 及其状态记录到本地文件,当 beanstalkd 重启时,可以通过读取 binlog 来恢复之前的 job 状态。

分布式

在 beanstalkd 的文档中,其实是支持分布式的,其设计思想和 Memcached 类似,beanstalkd 各个 server 之间并不知道彼此的存在,是通过 client 实现分布式以及根据 tube 名称去特定的 server 上获取 job。贴一篇专门讨论 beanstalkd 分布式的文章,Beanstalkd的一种分布式方案

任务延时

天然支持延时任务,可以在创建 job 时指定延时时间,也可以当 job 被处理完后才能后,消费者使用 release with delay 将 job 再次放入队列延时执行。

任务优先级

producer 生成的 job 可以给他分配优先级,支持 0 到 2^32 的优先级,值越小,优先级越高,默认优先级为 1024。优先级高的 job 会被 consumer 优先执行。

超时机制

为了防止某个 consumer 长时间占用 job 但无法处理完成的情况,beanstalkd 的 reserve 操作支持设置 timeout 时间(TTR)。如果 consumer 不能在 TTR 内发送 delete、release 或 bury 命令改变 job 状态,那么 beanstalkd 会认为任务处理失败,会将 job 重新置为 READY 状态供其他 consumer 消费。
如果消费者已经预知可能无法在 TTR 内完成该 job,则可以发送 touch 命令,使得 beanstalkd 重新计算 TTR。

任务预留

有一个 BURIED 状态可以作为缓冲,具体特点见上文生命周期中关于 BURIED 状态的介绍。

安装及配置

以下以 ubuntu 为例,安转 beanstalkd:

sudo apt-get update
sudo apt-get install beanstalkd
vi /etc/sysconfig/beanstalkd
添加如下内容
BEANSTALKD_BINLOG_DIR=/data/beanstalkd/binlog

可以通过 beanstalkd 命令来运行服务,并且可以添加多种参数。命令的格式如下:

beanstalkd [OPTIONS]

 -b DIR   wal directory
 -f MS    fsync at most once every MS milliseconds (use -f0 for "always fsync")
 -F       never fsync (default)
 -l ADDR  listen on address (default is 0.0.0.0)
 -p PORT  listen on port (default is 11300)
 -u USER  become user and group
 -z BYTES set the maximum job size in bytes (default is 65535)
 -s BYTES set the size of each wal file (default is 10485760)
            (will be rounded up to a multiple of 512 bytes)
 -c       compact the binlog (default)
 -n       do not compact the binlog
 -v       show version information
 -V       increase verbosity
 -h       show this help

如下我们启动一个 beanstalkd 服务,并开启 binlog:

nohup beanstalkd -l 0.0.0.0 -p 11300 -b /data/beanstalkd/binlog/ &

beanstalkd管理工具

官方推荐的一些管理工具:Tools
笔者常用的管理工具:https://github.com/ptrofimov/beanstalk_console
如果只是简单的操作和查看 beanstalkd,可以使用 telnet 工具,然后执行 stats,use,put,watch 等:

$ telnet 127.0.0.1 11300
stats

实际应用

beansralkd 有很多语言版本的客户端实现,官方提供了一些客户端列表beanstalkd客户端列表
如果现有的这些库不满足需求,也可以自行实现,参考 beanstalkd协议

以下以 go 为例,简单演示下 beanstalkd 常用处理操作。

go get github.com/beanstalkd/go-beanstalk

生产者

向默认的 tube 中投入 job:

id, err := conn.Put([]byte("myjob"), 1, 0, time.Minute)
if err != nil {
    panic(err)
}
fmt.Println("job", id)

向指定的 tube 中投入 job:

tube := &beanstalk.Tube{Conn: conn, Name: "mytube"}
id, err := tube.Put([]byte("myjob"), 1, 0, time.Minute)
if err != nil {
    panic(err)
}
fmt.Println("job", id)

消费者

消费默认的 tube 中的 job:

id, body, err := conn.Reserve(5 * time.Second)
if err != nil {
    panic(err)
}
fmt.Println("job", id)
fmt.Println(string(body))

消费指定的 tube (此处指定多个) 中的 job:

tubeSet := beanstalk.NewTubeSet(conn, "mytube1", "mytube2")
id, body, err := tubeSet.Reserve(10 * time.Hour)
if err != nil {
    panic(err)
}
fmt.Println("job", id)
fmt.Println(string(body))
  • 可以通过指定 tube ,在 put 的时候将 job 放入指定的 tube 中,否则会放入 default 的 tube 中。
  • beanstalkd 支持持久化,在启动时使用 -b参数来开启 binlog,通过 binog可以将 job 及其状态记录到文件里。当重新使用 -b参数重启 beanstalkd,将读取 binlog来恢复之前的 job 及状态。

参考资料

Original: https://www.cnblogs.com/immaxfang/p/16503956.html
Author: immaxfang
Title: 简单易用的任务队列-beanstalkd

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/582236/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 【socket】基于poll和epoll通信温度上报

    网络socket通信 * – poll函数 – epoll函数 – poll代码实现 – epoll代码实现 poll函数 poll…

    Linux 2023年6月13日
    087
  • jquery ajax提交数据给后端

    大家好,今天铁柱兄给大家带一段jquery ajax提交数据给后端的教学。 初学javaweb的同学前端提交数据基本上都是用form表单提交,这玩意儿反正我是觉得不太好玩。而Jav…

    Linux 2023年6月13日
    077
  • 三少玩Linux之LinuxMint, win7共存安装与简单配置

    先安装win7, 这里就不说了;再安装Mint, 这个是视频:https://www.bilibili.com/video/BV1AE411P7Cz; 这里关键就是LinuxMin…

    Linux 2023年6月14日
    0100
  • SpringBoot 2.1.9 整合 Redisson分布式锁

    官方参考文档 redisson-spring-boot-starter 官方文档 通过YAML文件配置单节点模式 一)、引入Redisson整合Spring Boot依赖 二)、通…

    Linux 2023年5月28日
    095
  • Jenkins 内置变量

    BRANCH_NAME 对于多分支项目,这将设置为正在构建的分支的名称,例如,如果您希望master从功能分支而不是从功能分支部署到生产;如果对应于某种更改请求,则名称通常是任意的…

    Linux 2023年5月27日
    087
  • shell脚本字符串截取方法整理

    首先先声明一个变量str,下面演示以该变量为例: str=’https://www.baidu.com/about.html’ 1.#号截取,删除左边字符,保留右边字符 echo …

    Linux 2023年5月28日
    072
  • cache和内存屏障

    1 cache简介 1.1 cache缓存映射规则 tag查看cache是否匹配,set index |tag |set index |block offset ||20-bit …

    Linux 2023年6月6日
    0101
  • Popovers

    弹出式窗口弹出式窗口是一个短暂的视图,当你点击一个控件或一个区域时,它就会出现在屏幕上的其他内容之上。通常情况下,弹出窗口包括一个箭头,指向它出现的位置。弹出式窗口可以是非模态或模…

    Linux 2023年6月7日
    081
  • Mysql在linux对大小写敏感的设置

    编辑/etc/my.cnf文件,在[mysqld]节下 添加 lower_case_table_names 参数,并设置相应的值 (备注:为0时大小写敏感,为1时大小写不敏感,默认…

    Linux 2023年6月13日
    086
  • 1.2

    数字信号为什么不能远程传播?高频率->传的短 容易被干扰 答案是可以的。 数字信号传输编码的目的:保证数据传送的可靠性 数据传输的关键指标: 延迟和吞吐量 posted @2…

    Linux 2023年6月6日
    085
  • 2021年1月-第02阶段-前端基础-HTML+CSS进阶-VS Code 软件

    软件安装 VSCode软件 能够安装 VS Code 能够熟练使用 VS Code 软件 能够安装 VS Code 最常用的插件 1. VS Code简介 1.1 VS Code …

    Linux 2023年6月8日
    078
  • 国产化之银河麒麟安装达梦数据库DM8

    背景 某个项目需要实现基础软件全部国产化,其中操作系统指定银河麒麟,数据库使用DM8。 虽然在之前的文章中已经成功模拟国产飞腾处理器,但是运行效率不高,所以这里的银河麒麟操作系统还…

    Linux 2023年5月27日
    094
  • 网络安全中常用浏览器插件、拓展

    引言 现在的火狐、Edge( Chromium内核)、Chrome等浏览器带有插件、拓展(Plugin)的功能。这些插件中有的可以过滤广告,有的提供便捷的翻译,有的提供JavaSc…

    Linux 2023年6月6日
    084
  • 2021年3月-第01阶段-Linux基础-Linux系统的启动流程

    Linux系统的启动流程 理解Linux操作系统启动流程,能有助于后期在企业中更好的维护Linux服务器,能快速定位系统问题,进而解决问题。 上图为Linux操作系统启动流程 1….

    Linux 2023年6月8日
    0103
  • phpweb成品网站最新版(注入、上传、写shell)

    注入:之所以鸡肋就是该漏洞利用安装文件 重新生成 配置文件 写入可执行代码 鸡肋1: 具有破坏性 动作非常大 重新写了配置文件 数据库连接文件鸡肋2: 有一定安全常识的站长都会删掉…

    Linux 2023年5月28日
    074
  • 【MQTT】iniparser库的安装和使用

    iniparser库 * – iniparser库介绍 – 下载库 – iniparser中的API – dictionary中的一…

    Linux 2023年6月13日
    087
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球