Versions in this module Expand all Collapse all v1 v1.0.2 Feb 7, 2025 Changes in this version type Message + Channel string + Topic string v1.0.1 Feb 7, 2025 Changes in this version + const FrameTypeError + const FrameTypeMessage + const FrameTypeResponse + const MsgIDLength + const StateConnected + const StateDisconnected + const StateInit + const VERSION + var ErrAlreadyConnected = errors.New("already connected") + var ErrClosing = errors.New("closing") + var ErrNotConnected = errors.New("not connected") + var ErrOverMaxInFlight = errors.New("over configure max-inflight") + var ErrStopped = errors.New("stopped") + var MagicV1 = []byte(" V1") + var MagicV2 = []byte(" V2") + func IsValidChannelName(name string) bool + func IsValidTopicName(name string) bool + func ReadResponse(r io.Reader, maxMsgSize int32) ([]byte, error) + func ReadUnpackedResponse(r io.Reader, maxMsgSize int32) (int32, []byte, error) + func UnpackResponse(response []byte) (int32, []byte, error) + type AuthResponse struct + Identity string + IdentityUrl string + PermissionCount int64 + type BackoffStrategy interface + Calculate func(attempt int) time.Duration + type Command struct + Body []byte + Name []byte + Params [][]byte + func Auth(secret string) (*Command, error) + func DeferredPublish(topic string, delay time.Duration, body []byte) *Command + func Finish(id MessageID) *Command + func Identify(js map[string]interface{}) (*Command, error) + func MultiPublish(topic string, bodies [][]byte) (*Command, error) + func Nop() *Command + func Ping() *Command + func Publish(topic string, body []byte) *Command + func Ready(count int) *Command + func Register(topic string, channel string) *Command + func Requeue(id MessageID, delay time.Duration) *Command + func StartClose() *Command + func Subscribe(topic string, channel string) *Command + func Touch(id MessageID) *Command + func UnRegister(topic string, channel string) *Command + func (c *Command) String() string + func (c *Command) WriteTo(w io.Writer) (int64, error) + type Config struct + AuthSecret string + BackoffMultiplier time.Duration + BackoffStrategy BackoffStrategy + ClientID string + DefaultRequeueDelay time.Duration + Deflate bool + DeflateLevel int + DialTimeout time.Duration + HeartbeatInterval time.Duration + Hostname string + LocalAddr net.Addr + LookupdAuthorization bool + LookupdPollInterval time.Duration + LookupdPollJitter float64 + LookupdPollTimeout time.Duration + LowRdyIdleTimeout time.Duration + LowRdyTimeout time.Duration + MaxAttempts uint16 + MaxBackoffDuration time.Duration + MaxInFlight int + MaxMsgSize int32 + MaxRequeueDelay time.Duration + MsgTimeout time.Duration + OutputBufferSize int64 + OutputBufferTimeout time.Duration + RDYRedistributeInterval time.Duration + ReadTimeout time.Duration + SampleRate int32 + Snappy bool + TlsConfig *tls.Config + TlsV1 bool + UserAgent string + WriteTimeout time.Duration + func NewConfig() *Config + func (c *Config) Set(option string, value interface{}) error + func (c *Config) Validate() error + type ConfigFlag struct + Config *Config + func (c *ConfigFlag) Set(opt string) (err error) + func (c *ConfigFlag) String() string + type Conn struct + func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn + func (c *Conn) Close() error + func (c *Conn) Connect() (*IdentifyResponse, error) + func (c *Conn) Flush() error + func (c *Conn) IsClosing() bool + func (c *Conn) LastMessageTime() time.Time + func (c *Conn) LastRDY() int64 + func (c *Conn) LastRdyTime() time.Time + func (c *Conn) MaxRDY() int64 + func (c *Conn) RDY() int64 + func (c *Conn) Read(p []byte) (int, error) + func (c *Conn) RemoteAddr() net.Addr + func (c *Conn) SetLogger(l logger, lvl LogLevel, format string) + func (c *Conn) SetLoggerForLevel(l logger, lvl LogLevel, format string) + func (c *Conn) SetLoggerLevel(lvl LogLevel) + func (c *Conn) SetRDY(rdy int64) + func (c *Conn) String() string + func (c *Conn) Write(p []byte) (int, error) + func (c *Conn) WriteCommand(cmd *Command) error + type ConnDelegate interface + OnBackoff func(*Conn) + OnClose func(*Conn) + OnContinue func(*Conn) + OnError func(*Conn, []byte) + OnHeartbeat func(*Conn) + OnIOError func(*Conn, error) + OnMessage func(*Conn, *Message) + OnMessageFinished func(*Conn, *Message) + OnMessageRequeued func(*Conn, *Message) + OnResponse func(*Conn, []byte) + OnResume func(*Conn) + type Consumer struct + StopChan chan int + func NewConsumer(topic string, channel string, config *Config) (*Consumer, error) + func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) + func (r *Consumer) AddHandler(handler Handler) + func (r *Consumer) ChangeMaxInFlight(maxInFlight int) + func (r *Consumer) ConnectToNSQD(addr string) error + func (r *Consumer) ConnectToNSQDs(addresses []string) error + func (r *Consumer) ConnectToNSQLookupd(addr string) error + func (r *Consumer) ConnectToNSQLookupds(addresses []string) error + func (r *Consumer) DisconnectFromNSQD(addr string) error + func (r *Consumer) DisconnectFromNSQLookupd(addr string) error + func (r *Consumer) IsStarved() bool + func (r *Consumer) SetBehaviorDelegate(cb interface{}) + func (r *Consumer) SetLogger(l logger, lvl LogLevel) + func (r *Consumer) SetLoggerForLevel(l logger, lvl LogLevel) + func (r *Consumer) SetLoggerLevel(lvl LogLevel) + func (r *Consumer) SetLookupdHttpClient(httpclient *http.Client) + func (r *Consumer) Stats() *ConsumerStats + func (r *Consumer) Stop() + type ConsumerStats struct + Connections int + MessagesFinished uint64 + MessagesReceived uint64 + MessagesRequeued uint64 + type DiscoveryFilter interface + Filter func([]string) []string + type ErrIdentify struct + Reason string + func (e ErrIdentify) Error() string + type ErrProtocol struct + Reason string + func (e ErrProtocol) Error() string + type ExponentialStrategy struct + func (s *ExponentialStrategy) Calculate(attempt int) time.Duration + type FailedMessageLogger interface + LogFailedMessage func(message *Message) + type FullJitterStrategy struct + func (s *FullJitterStrategy) Calculate(attempt int) time.Duration + type Handler interface + HandleMessage func(message *Message) error + type HandlerFunc func(message *Message) error + func (h HandlerFunc) HandleMessage(m *Message) error + type IdentifyResponse struct + AuthRequired bool + Deflate bool + MaxRdyCount int64 + Snappy bool + TLSv1 bool + type LogLevel int + const LogLevelDebug + const LogLevelError + const LogLevelInfo + const LogLevelMax + const LogLevelWarning + func (lvl LogLevel) String() string + type Message struct + Attempts uint16 + Body []byte + Delegate MessageDelegate + ID MessageID + NSQDAddress string + Timestamp int64 + func DecodeMessage(b []byte) (*Message, error) + func NewMessage(id MessageID, body []byte) *Message + func (m *Message) DisableAutoResponse() + func (m *Message) Finish() + func (m *Message) HasResponded() bool + func (m *Message) IsAutoResponseDisabled() bool + func (m *Message) Requeue(delay time.Duration) + func (m *Message) RequeueWithoutBackoff(delay time.Duration) + func (m *Message) Touch() + func (m *Message) WriteTo(w io.Writer) (int64, error) + type MessageDelegate interface + OnFinish func(*Message) + OnRequeue func(m *Message, delay time.Duration, backoff bool) + OnTouch func(*Message) + type MessageID [MsgIDLength]byte + type Producer struct + func NewProducer(addr string, config *Config) (*Producer, error) + func (w *Producer) DeferredPublish(topic string, delay time.Duration, body []byte) error + func (w *Producer) DeferredPublishAsync(topic string, delay time.Duration, body []byte, ...) error + func (w *Producer) MultiPublish(topic string, body [][]byte) error + func (w *Producer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *ProducerTransaction, ...) error + func (w *Producer) Ping() error + func (w *Producer) Publish(topic string, body []byte) error + func (w *Producer) PublishAsync(topic string, body []byte, doneChan chan *ProducerTransaction, ...) error + func (w *Producer) SetLogger(l logger, lvl LogLevel) + func (w *Producer) SetLoggerForLevel(l logger, lvl LogLevel) + func (w *Producer) SetLoggerLevel(lvl LogLevel) + func (w *Producer) Stop() + func (w *Producer) String() string + type ProducerTransaction struct + Args []interface{} + Error error