NSQ
nsq简介
NSQ是一个基于Go语言的分布式实时消息平台,NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。
nsq组件构成
NSQ是由四个重要组件构成:
- nsqd:一个负责接收、排队、转发消息到客户端的守护进程
- nsqlookupd:管理拓扑信息并提供最终一致性的发现服务的守护进程
- nsqadmin:一套Web用户界面,可实时查看集群的统计数据和执行各种各样的管理任务
- utilities:常见基础功能、数据流处理工具,如nsqstat、nsqtail、nsqtofile、nsqtohttp、nsqtonsq、to_nsq
NSQ生产特点
- 支持消息内存队列的大小设置,默认完全持久化(值为0),消息即可持久到磁盘也可以保存在内存中
- 保证消息至少传递一次,以确保消息可以最终成功发送
- 收到的消息是无序的, 实现了松散订购
- 发现服务nsqlookupd具有最终一致性,消息最终能够找到所有Topic生产者
单个nsqd可以有多个Topic,每个Topic又可以有多个Channel。Channel能够接收Topic所有消息的副本,从而实现了消息多播分发;而Channel上的每个消息被分发给它的订阅者,从而实现负载均衡,所有这些就组成了一个可以表示各种简单和复杂拓扑结构的强大框架。
NSQ的主要特点
- 具有分布式且无单点故障的拓扑结构 支持水平扩展,在无中断情况下能够无缝地添加集群节点
- 低延迟的消息推送,参见官方提供的性能说明文档
- 具有组合式的负载均衡和多播形式的消息路由
- 既擅长处理面向流(高吞吐量)的工作负载,也擅长处理面向Job的(低吞吐量)工作负载
- 消息数据既可以存储于内存中,也可以存储在磁盘中
- 实现了生产者、消费者自动发现和消费者自动连接生产者,参见nsqlookupd
- 支持安全传输层协议(TLS),从而确保了消息传递的安全性
- 具有与数据格式无关的消息结构,支持JSON、Protocol Buffers、MsgPacek等消息格式
- 非常易于部署(几乎没有依赖)和配置(所有参数都可以通过命令行进行配置)
- 使用了简单的TCP协议且具有多种语言的客户端功能库
- 具有用于信息统计、管理员操作和实现生产者等的HTTP接口
- 为实时检测集成了统计数据收集器StatsD
- 具有强大的集群管理界面,参见nsqadmin
NSQ安装
二进制安装
说明:二进制文件可以直接运行,不需要安装go环境,因为已经编译好了。
下载二进制: wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz
解压之后放在/usr/local/nsq下面, 并新建立几个目录:mkdir /usr/local/nsq/{data,config,log}
默认配置文件可以从以下下载:https://github.com/nsqio/nsq/blob/master/contrib/nsqlookupd.cfg.example
配置启动项: 在/usr/lib/systemd/system下面建立以下几个启动项:
- nsqlookupd.service:
[Unit] Description=NSQLookupD After=network.target [Service] LimitCORE=infinity LimitNOFILE=100000 LimitNPROC=100000 WorkingDirectory=/usr/local/nsq ExecStart=/usr/local/nsq/bin/nsqlookupd -config=/usr/local/nsq/config/nsqlookupd.cfg ExecReload=/bin/kill -HUP $MAINPID Type=simple KillMode=process Restart=on-failure RestartSec=10s User=root [Install] WantedBy=multi-user.target
- nsqd.service:
[Unit] Description=NSQD After=network.target [Service] LimitCORE=infinity LimitNOFILE=100000 LimitNPROC=100000 WorkingDirectory=/usr/local/nsq ExecStart=/usr/local/nsq/bin/nsqd -config=/usr/local/nsq/config/nsqd.cfg ExecReload=/bin/kill -HUP $MAINPID Type=simple KillMode=process Restart=on-failure RestartSec=10s User=root [Install] WantedBy=multi-user.target
- nsqadmin:
[Unit] Description=NSQAdmin After=network.target [Service] LimitCORE=infinity LimitNOFILE=100000 LimitNPROC=100000 WorkingDirectory=/usr/local/nsq ExecStart=/usr/local/nsq/bin/nsqadmin -config=/usr/local/nsq/config/nsqadmin.cfg ExecReload=/bin/kill -HUP $MAINPID Type=simple KillMode=process Restart=on-failure RestartSec=10s User=root [Install] WantedBy=multi-user.target
配置文件如下:
- nsqlookupd.cfg:
## log verbosity level: debug, info, warn, error, or fatal log-level = "warn" ## : to listen on for TCP clients tcp_address = "0.0.0.0:4160" ## : to listen on for HTTP clients http_address = "0.0.0.0:4161" ## address that will be registered with lookupd (defaults to the OS hostname) # broadcast_address = "" ## duration of time a producer will remain in the active list since its last ping inactive_producer_timeout = "300s" ## duration of time a producer will remain tombstoned if registration remains tombstone_lifetime = "45s"
- nsqd.cfg:
## log verbosity level: debug, info, warn, error, or fatal log-level = "warn" ## : to listen on for TCP clients tcp_address = "0.0.0.0:4160" ## : to listen on for HTTP clients http_address = "0.0.0.0:4161" ## address that will be registered with lookupd (defaults to the OS hostname) # broadcast_address = "" ## duration of time a producer will remain in the active list since its last ping inactive_producer_timeout = "300s" ## duration of time a producer will remain tombstoned if registration remains tombstone_lifetime = "45s" [root@node1 config]# cat nsqd.cfg ## log verbosity level: debug, info, warn, error, or fatal log-level = "error" ## unique identifier (int) for this worker (will default to a hash of hostname) # id = 5150 ## : to listen on for TCP clients tcp_address = "0.0.0.0:4150" ## : to listen on for HTTP clients http_address = "0.0.0.0:4151" ## : to listen on for HTTPS clients # https_address = "0.0.0.0:4152" ## address that will be registered with lookupd (defaults to the OS hostname) # broadcast_address = "" ## cluster of nsqlookupd TCP addresses nsqlookupd_tcp_addresses = [ "127.0.0.1:4160" ] ## duration to wait before HTTP client connection timeout http_client_connect_timeout = "2s" ## duration to wait before HTTP client request timeout http_client_request_timeout = "5s" ## path to store disk-backed messages data_path = "/usr/local/nsq/data" ## number of messages to keep in memory (per topic/channel) mem_queue_size = 10000 ## number of bytes per diskqueue file before rolling max_bytes_per_file = 104857600 ## number of messages per diskqueue fsync sync_every = 2500 ## duration of time per diskqueue fsync (time.Duration) sync_timeout = "2s" ## duration to wait before auto-requeing a message msg_timeout = "60s" ## maximum duration before a message will timeout max_msg_timeout = "15m" ## maximum size of a single message in bytes max_msg_size = 1024768 ## maximum requeuing timeout for a message max_req_timeout = "1h" ## maximum size of a single command body max_body_size = 5123840 ## maximum client configurable duration of time between client heartbeats max_heartbeat_interval = "60s" ## maximum RDY count for a client max_rdy_count = 2500 ## maximum client configurable size (in bytes) for a client output buffer max_output_buffer_size = 65536 ## maximum client configurable duration of time between flushing to a client (time.Duration) max_output_buffer_timeout = "1s" ## UDP : of a statsd daemon for pushing stats #statsd_address = "127.0.0.1:8125" ## prefix used for keys sent to statsd (%s for host replacement) statsd_prefix = "nsq.%s" ## duration between pushing to statsd (time.Duration) statsd_interval = "60s" ## toggle sending memory and GC stats to statsd statsd_mem_stats = true ## message processing time percentiles to keep track of (float) e2e_processing_latency_percentiles = [ 100.0, 99.0, 95.0 ] ## calculate end to end latency quantiles for this duration of time (time.Duration) e2e_processing_latency_window_time = "10m" ## path to certificate file tls_cert = "" ## path to private key file tls_key = "" ## set policy on client certificate (require - client must provide certificate, ## require-verify - client must provide verifiable signed certificate) # tls_client_auth_policy = "require-verify" ## set custom root Certificate Authority # tls_root_ca_file = "" ## require client TLS upgrades tls_required = false ## minimum TLS version ("ssl3.0", "tls1.0," "tls1.1", "tls1.2") tls_min_version = "" ## enable deflate feature negotiation (client compression) deflate = true ## max deflate compression level a client can negotiate (> values == > nsqd CPU usage) max_deflate_level = 6 ## enable snappy feature negotiation (client compression) snappy = true
- nsqadmin.cfg:
## log verbosity level: debug, info, warn, error, or fatal log-level = "info" ## : to listen on for HTTP clients http_address = "0.0.0.0:4171" ## graphite HTTP address graphite_url = "192.168.7.230:4171" ## proxy HTTP requests to graphite proxy_graphite = false ## prefix used for keys sent to statsd (%s for host replacement, must match nsqd) statsd_prefix = "nsq.%s" ## format of statsd counter stats statsd_counter_format = "stats.counters.%s.count" ## format of statsd gauge stats statsd_gauge_format = "stats.gauges.%s" ## time interval nsqd is configured to push to statsd (must match nsqd) statsd_interval = "60s" ## HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent notification_http_endpoint = "" ## nsqlookupd HTTP addresses nsqlookupd_http_addresses = [ "127.0.0.1:4161" ] ## nsqd HTTP addresses (optional) #nsqd_http_addresses = [ # "127.0.0.1:4151" #]
然后就可以启动了,但注意启动顺序:nsqlookup,nsqd,nsqadmin
源码安装
源码安装需要安装go环境: go版本需要1.6以上
- go环境安装: go二进制安装,如果放在/usr/local下面,环境变量配置如下:
export PATH=$PATH:/usr/local/go/bin
如果安装在自定义目录 需要定义goroot,配置如下:
export GOROOT=$HOME/go1.X export PATH=$PATH:$GOROOT/bin
- 需要安装gpm命令 安装如下:
git clone https://github.com/pote/gpm.git && cd gpm $ git checkout v1.4.0 # You can ignore this part if you want to install HEAD. $ ./configure $ make install
参考: https://github.com/pote/gpm
NSQ运行说明
nsqlookup默认监听4160和4160两个端口,4160建立一个tcp server,4160与nsqd进行数据沟通,4161会建立一个http server,用于和nsqadmin进行数据交互。
nsqd 默认会启动4151端口,接收http方式发送来的消息数据,tcp:4150
nsqadmin提供了一个可以访问的web界面,用于实时查看集群状态和执行一些管理操作,端口http:4171
如果nsqlookupd有多个服务,nsqd.cfg的配置可以如下修改:
nsqlookupd_tcp_addresses = [ “127.0.0.1:4160″, “192.168.1.*:4160″, …. ]
docker启动命令
docker启动可以用docker-compose来启动,这样不需要指定ip,也是就可以的,etcd也是一样,因为docker本身也有服务发现(etcd)和dnsmap。
#!/bin/bash
docker run \
--name nslookup \
-p 4160:4160 \
-p 4161:4161 \
-v /etc/localtime:/etc/localtime:ro \
-d \
--network clc \
--ip 172.18.0.111 \
nsqio/nsq \
/nsqlookupd
docker run \
--name nsqd \
-p 4150:4150 \
-p 4151:4151 \
-v /etc/localtime:/etc/localtime:ro \
-d \
--network clc \
--ip 172.18.0.112 \
-v /home/docker/nsq/data:/data \
nsqio/nsq \
/nsqd \
--broadcast-address=192.168.7.211 \
--lookupd-tcp-address=192.168.7.211:4160 \
--data-path=/data
docker run \
--name nsqadmin \
-p 4171:4171 \
-v /etc/localtime:/etc/localtime:ro \
-d \
--network clc \
--ip 172.18.0.113 \
nsqio/nsq \
/nsqadmin \
--lookupd-http-address=192.168.7.211:4161
原文链接:NSQ,转载请注明来源!