首页 » 分布式 » NSQ

NSQ

 
文章目录

NSQ

nsq简介

NSQ是一个基于Go语言的分布式实时消息平台,NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。

官网:http://nsq.io

nsq组件构成

NSQ是由四个重要组件构成:

  • nsqd:一个负责接收、排队、转发消息到客户端的守护进程
  • nsqlookupd:管理拓扑信息并提供最终一致性的发现服务的守护进程
  • nsqadmin:一套Web用户界面,可实时查看集群的统计数据和执行各种各样的管理任务
  • utilities:常见基础功能、数据流处理工具,如nsqstat、nsqtail、nsqtofile、nsqtohttp、nsqtonsq、to_nsq

NSQ生产特点

  • 支持消息内存队列的大小设置,默认完全持久化(值为0),消息即可持久到磁盘也可以保存在内存中
  • 保证消息至少传递一次,以确保消息可以最终成功发送
  • 收到的消息是无序的, 实现了松散订购
  • 发现服务nsqlookupd具有最终一致性,消息最终能够找到所有Topic生产者

单个nsqd可以有多个Topic,每个Topic又可以有多个Channel。Channel能够接收Topic所有消息的副本,从而实现了消息多播分发;而Channel上的每个消息被分发给它的订阅者,从而实现负载均衡,所有这些就组成了一个可以表示各种简单和复杂拓扑结构的强大框架。

NSQ的主要特点

  • 具有分布式且无单点故障的拓扑结构 支持水平扩展,在无中断情况下能够无缝地添加集群节点
  • 低延迟的消息推送,参见官方提供的性能说明文档
  • 具有组合式的负载均衡和多播形式的消息路由
  • 既擅长处理面向流(高吞吐量)的工作负载,也擅长处理面向Job的(低吞吐量)工作负载
  • 消息数据既可以存储于内存中,也可以存储在磁盘中
  • 实现了生产者、消费者自动发现和消费者自动连接生产者,参见nsqlookupd
  • 支持安全传输层协议(TLS),从而确保了消息传递的安全性
  • 具有与数据格式无关的消息结构,支持JSON、Protocol Buffers、MsgPacek等消息格式
  • 非常易于部署(几乎没有依赖)和配置(所有参数都可以通过命令行进行配置)
  • 使用了简单的TCP协议且具有多种语言的客户端功能库
  • 具有用于信息统计、管理员操作和实现生产者等的HTTP接口
  • 为实时检测集成了统计数据收集器StatsD
  • 具有强大的集群管理界面,参见nsqadmin

NSQ安装

二进制安装

说明:二进制文件可以直接运行,不需要安装go环境,因为已经编译好了。

下载二进制: wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz

解压之后放在/usr/local/nsq下面, 并新建立几个目录:mkdir /usr/local/nsq/{data,config,log}

默认配置文件可以从以下下载:https://github.com/nsqio/nsq/blob/master/contrib/nsqlookupd.cfg.example

配置启动项: 在/usr/lib/systemd/system下面建立以下几个启动项:

  • nsqlookupd.service:
[Unit]
Description=NSQLookupD
After=network.target

[Service]
LimitCORE=infinity
LimitNOFILE=100000
LimitNPROC=100000
WorkingDirectory=/usr/local/nsq
ExecStart=/usr/local/nsq/bin/nsqlookupd -config=/usr/local/nsq/config/nsqlookupd.cfg
ExecReload=/bin/kill -HUP $MAINPID
Type=simple
KillMode=process
Restart=on-failure
RestartSec=10s
User=root

[Install]
WantedBy=multi-user.target
  • nsqd.service:
[Unit]
Description=NSQD
After=network.target

[Service]
LimitCORE=infinity
LimitNOFILE=100000
LimitNPROC=100000
WorkingDirectory=/usr/local/nsq
ExecStart=/usr/local/nsq/bin/nsqd -config=/usr/local/nsq/config/nsqd.cfg
ExecReload=/bin/kill -HUP $MAINPID
Type=simple
KillMode=process
Restart=on-failure
RestartSec=10s
User=root

[Install]
WantedBy=multi-user.target
  • nsqadmin:
[Unit]
Description=NSQAdmin
After=network.target

[Service]
LimitCORE=infinity
LimitNOFILE=100000
LimitNPROC=100000
WorkingDirectory=/usr/local/nsq
ExecStart=/usr/local/nsq/bin/nsqadmin -config=/usr/local/nsq/config/nsqadmin.cfg
ExecReload=/bin/kill -HUP $MAINPID
Type=simple
KillMode=process
Restart=on-failure
RestartSec=10s
User=root

[Install]
WantedBy=multi-user.target

配置文件如下:

  • nsqlookupd.cfg:
## log verbosity level: debug, info, warn, error, or fatal
log-level = "warn"

## : to listen on for TCP clients
tcp_address = "0.0.0.0:4160"

## : to listen on for HTTP clients
http_address = "0.0.0.0:4161"

## address that will be registered with lookupd (defaults to the OS hostname)
# broadcast_address = ""


## duration of time a producer will remain in the active list since its last ping
inactive_producer_timeout = "300s"

## duration of time a producer will remain tombstoned if registration remains
tombstone_lifetime = "45s"
  • nsqd.cfg:
## log verbosity level: debug, info, warn, error, or fatal
log-level = "warn"

## : to listen on for TCP clients
tcp_address = "0.0.0.0:4160"

## : to listen on for HTTP clients
http_address = "0.0.0.0:4161"

## address that will be registered with lookupd (defaults to the OS hostname)
# broadcast_address = ""


## duration of time a producer will remain in the active list since its last ping
inactive_producer_timeout = "300s"

## duration of time a producer will remain tombstoned if registration remains
tombstone_lifetime = "45s"
[root@node1 config]# cat nsqd.cfg 
## log verbosity level: debug, info, warn, error, or fatal
log-level = "error"

## unique identifier (int) for this worker (will default to a hash of hostname)
# id = 5150

## : to listen on for TCP clients
tcp_address = "0.0.0.0:4150"

## : to listen on for HTTP clients
http_address = "0.0.0.0:4151"

## : to listen on for HTTPS clients
# https_address = "0.0.0.0:4152"

## address that will be registered with lookupd (defaults to the OS hostname)
# broadcast_address = ""

## cluster of nsqlookupd TCP addresses
nsqlookupd_tcp_addresses = [
    "127.0.0.1:4160"
]

## duration to wait before HTTP client connection timeout
http_client_connect_timeout = "2s"

## duration to wait before HTTP client request timeout
http_client_request_timeout = "5s"

## path to store disk-backed messages
data_path = "/usr/local/nsq/data"

## number of messages to keep in memory (per topic/channel)
mem_queue_size = 10000

## number of bytes per diskqueue file before rolling
max_bytes_per_file = 104857600

## number of messages per diskqueue fsync
sync_every = 2500

## duration of time per diskqueue fsync (time.Duration)
sync_timeout = "2s"


## duration to wait before auto-requeing a message
msg_timeout = "60s"

## maximum duration before a message will timeout
max_msg_timeout = "15m"

## maximum size of a single message in bytes
max_msg_size = 1024768

## maximum requeuing timeout for a message
max_req_timeout = "1h"

## maximum size of a single command body
max_body_size = 5123840


## maximum client configurable duration of time between client heartbeats
max_heartbeat_interval = "60s"

## maximum RDY count for a client
max_rdy_count = 2500

## maximum client configurable size (in bytes) for a client output buffer
max_output_buffer_size = 65536

## maximum client configurable duration of time between flushing to a client (time.Duration)
max_output_buffer_timeout = "1s"


## UDP : of a statsd daemon for pushing stats
#statsd_address = "127.0.0.1:8125"

## prefix used for keys sent to statsd (%s for host replacement)
statsd_prefix = "nsq.%s"

## duration between pushing to statsd (time.Duration)
statsd_interval = "60s"

## toggle sending memory and GC stats to statsd
statsd_mem_stats = true


## message processing time percentiles to keep track of (float)
e2e_processing_latency_percentiles = [
    100.0,
    99.0,
    95.0
]

## calculate end to end latency quantiles for this duration of time (time.Duration)
e2e_processing_latency_window_time = "10m"


## path to certificate file
tls_cert = ""

## path to private key file
tls_key = ""

## set policy on client certificate (require - client must provide certificate,
##  require-verify - client must provide verifiable signed certificate)
# tls_client_auth_policy = "require-verify"

## set custom root Certificate Authority
# tls_root_ca_file = ""

## require client TLS upgrades
tls_required = false

## minimum TLS version ("ssl3.0", "tls1.0," "tls1.1", "tls1.2")
tls_min_version = ""

## enable deflate feature negotiation (client compression)
deflate = true

## max deflate compression level a client can negotiate (> values == > nsqd CPU usage)
max_deflate_level = 6

## enable snappy feature negotiation (client compression)
snappy = true
  • nsqadmin.cfg:
## log verbosity level: debug, info, warn, error, or fatal
log-level = "info"

## : to listen on for HTTP clients
http_address = "0.0.0.0:4171"

## graphite HTTP address
graphite_url = "192.168.7.230:4171"

## proxy HTTP requests to graphite
proxy_graphite = false

## prefix used for keys sent to statsd (%s for host replacement, must match nsqd)
statsd_prefix = "nsq.%s"

## format of statsd counter stats
statsd_counter_format = "stats.counters.%s.count"

## format of statsd gauge stats
statsd_gauge_format = "stats.gauges.%s"

## time interval nsqd is configured to push to statsd (must match nsqd)
statsd_interval = "60s"

## HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent
notification_http_endpoint = ""


## nsqlookupd HTTP addresses
nsqlookupd_http_addresses = [
    "127.0.0.1:4161"
]

## nsqd HTTP addresses (optional)
#nsqd_http_addresses = [
#    "127.0.0.1:4151"
#]

然后就可以启动了,但注意启动顺序:nsqlookup,nsqd,nsqadmin

源码安装

源码安装需要安装go环境: go版本需要1.6以上

  • go环境安装: go二进制安装,如果放在/usr/local下面,环境变量配置如下:
    export PATH=$PATH:/usr/local/go/bin
    如果安装在自定义目录 需要定义goroot,配置如下:
export GOROOT=$HOME/go1.X
export PATH=$PATH:$GOROOT/bin
  • 需要安装gpm命令 安装如下:
 git clone https://github.com/pote/gpm.git && cd gpm
$ git checkout v1.4.0 # You can ignore this part if you want to install HEAD.
$ ./configure
$ make install

参考: https://github.com/pote/gpm

NSQ运行说明

nsqlookup默认监听4160和4160两个端口,4160建立一个tcp server,4160与nsqd进行数据沟通,4161会建立一个http server,用于和nsqadmin进行数据交互。

nsqd 默认会启动4151端口,接收http方式发送来的消息数据,tcp:4150

nsqadmin提供了一个可以访问的web界面,用于实时查看集群状态和执行一些管理操作,端口http:4171

如果nsqlookupd有多个服务,nsqd.cfg的配置可以如下修改:

nsqlookupd_tcp_addresses = [
“127.0.0.1:4160″,
“192.168.1.*:4160″,
….
]

docker启动命令

docker启动可以用docker-compose来启动,这样不需要指定ip,也是就可以的,etcd也是一样,因为docker本身也有服务发现(etcd)和dnsmap。

#!/bin/bash
docker run \
  	--name nslookup \
	-p 4160:4160 \
	-p 4161:4161 \
	-v /etc/localtime:/etc/localtime:ro \
	-d \
	--network clc \
	--ip 172.18.0.111 \
	nsqio/nsq \
	/nsqlookupd



docker run \
	--name nsqd \
	-p 4150:4150 \
	-p 4151:4151 \
	-v /etc/localtime:/etc/localtime:ro \
	-d \
	--network clc \
	--ip 172.18.0.112 \
	-v /home/docker/nsq/data:/data \
	nsqio/nsq \
	/nsqd \
	--broadcast-address=192.168.7.211 \
	--lookupd-tcp-address=192.168.7.211:4160 \
	--data-path=/data	

docker run \
	--name nsqadmin \
	-p 4171:4171 \
	-v /etc/localtime:/etc/localtime:ro \
	-d \
	--network clc \
	--ip 172.18.0.113 \
	nsqio/nsq \
	/nsqadmin \
	--lookupd-http-address=192.168.7.211:4161

 

 

原文链接:NSQ,转载请注明来源!

1