Documentation
¶
Index ¶
- Variables
- func AnyToString(v any) string
- func CtxWithLogger(ctx context.Context, v Logger) context.Context
- func CtxWithPingPong(ctx context.Context, v WaitPingPong) context.Context
- func ErrorExtractCode(err error) int
- func ErrorJoin3rdParty(myErr error, Err3rd error) error
- func ErrorJoin3rdPartyWithMsg(myErr error, Err3rd error, msg string, args ...any) error
- func ErrorWrapWithMessage(myErr error, msg string, args ...any) error
- func GenerateRandomCode(length int) string
- func GenerateUlid() string
- func LookupLogLevel(level LogLevel) string
- func PutMessage(message *Message)
- func ReliableTask(task func() error, allowStop func() bool, retryMaxSecond int, ...) error
- func SendPingWaitPong(sendPingSeconds int, sendPing func() error, waitPong WaitPingPong, ...) error
- func SetDefaultLogger(l Logger)
- func UseSkipMessage() func(message *Message, dep any) error
- func WaitPingSendPong(waitPingSeconds int, waitPing WaitPingPong, sendPong func() error, ...) error
- type Adapter
- func (adp *Adapter) Identifier() string
- func (adp *Adapter) IsStopped() bool
- func (adp *Adapter) Listen() (err error)
- func (adp *Adapter) Log() Logger
- func (adp *Adapter) OnDisconnect(terminates ...func(adp IAdapter))
- func (adp *Adapter) RawInfra() any
- func (adp *Adapter) RawSend(messages ...*Message) error
- func (adp *Adapter) Send(messages ...*Message) error
- func (adp *Adapter) SetLog(logger Logger)
- func (adp *Adapter) Stop() error
- func (adp *Adapter) WaitStop() chan struct{}
- type AdapterHub
- type AdapterOption
- func (opt *AdapterOption) AdapterHub(hub AdapterHub) *AdapterOption
- func (opt *AdapterOption) Build() (adp IAdapter, err error)
- func (opt *AdapterOption) DecorateAdapter(wrap func(adapter IAdapter) (application IAdapter)) *AdapterOption
- func (opt *AdapterOption) EgressMux(mux *Mux) *AdapterOption
- func (opt *AdapterOption) Identifier(identifier string) *AdapterOption
- func (opt *AdapterOption) IngressMux(mux *Mux) *AdapterOption
- func (opt *AdapterOption) Lifecycle(setup func(life *Lifecycle)) *AdapterOption
- func (opt *AdapterOption) Logger(logger Logger) *AdapterOption
- func (opt *AdapterOption) RawFixup(maxRetrySecond int, fixup func(IAdapter) error) *AdapterOption
- func (opt *AdapterOption) RawInfra(infra any) *AdapterOption
- func (opt *AdapterOption) RawRecv(recv func(logger Logger) (message *Message, err error)) *AdapterOption
- func (opt *AdapterOption) RawSend(send func(logger Logger, message *Message) error) *AdapterOption
- func (opt *AdapterOption) RawStop(stop func(logger Logger) error) *AdapterOption
- func (opt *AdapterOption) SendPing(sendPingSeconds int, waitPong WaitPingPong, sendPing func(IAdapter) error) *AdapterOption
- func (opt *AdapterOption) WaitPing(waitPingSeconds int, waitPing WaitPingPong, sendPong func(IAdapter) error) *AdapterOption
- type Consumer
- type CustomError
- type HandleFunc
- type Hub
- func (hub *Hub) Count(filter func(IAdapter) bool) int
- func (hub *Hub) DoAsync(action func(IAdapter))
- func (hub *Hub) DoSync(action func(IAdapter) (stop bool))
- func (hub *Hub) FindByKey(key string) (adp IAdapter, found bool)
- func (hub *Hub) FindMulti(filter func(IAdapter) bool) (all []IAdapter, found bool)
- func (hub *Hub) FindMultiByKey(keys []string) (all []IAdapter, found bool)
- func (hub *Hub) FindOne(filter func(IAdapter) bool) (adp IAdapter, found bool)
- func (hub *Hub) IsShutdown() bool
- func (hub *Hub) Join(key string, adp IAdapter) error
- func (hub *Hub) RemoveByKey(key string)
- func (hub *Hub) RemoveMulti(filter func(IAdapter) bool)
- func (hub *Hub) RemoveMultiByKey(keys []string)
- func (hub *Hub) RemoveOne(filter func(IAdapter) bool)
- func (hub *Hub) SetConcurrencyQty(concurrencyQty int)
- func (hub *Hub) Shutdown()
- func (hub *Hub) Total() int
- func (hub *Hub) UpdateByOldKey(oldKey string, update func(IAdapter) (freshKey string)) error
- func (hub *Hub) WaitShutdown() chan struct{}
- type IAdapter
- type Lifecycle
- type LogLevel
- type Logger
- type Message
- type Middleware
- func UseAsync() Middleware
- func UseCopyMessage() Middleware
- func UseExclude(subjects []string) Middleware
- func UseHowMuchTime() Middleware
- func UseInclude(subjects []string) Middleware
- func UseLogger(withMsgId bool, safeConcurrency SafeConcurrencyKind) Middleware
- func UseRecover() Middleware
- func UseRetry(retryMaxSecond int) Middleware
- type Mux
- func (mux *Mux) DefaultHandler(h HandleFunc, mw ...Middleware) *Mux
- func (mux *Mux) EnableMessagePool() *Mux
- func (mux *Mux) Endpoints(action func(subject, handler string))
- func (mux *Mux) ErrorHandler(errHandlers ...Middleware) *Mux
- func (mux *Mux) Group(groupName string) *Mux
- func (mux *Mux) GroupByNumber(groupName int) *Mux
- func (mux *Mux) HandleMessage(message *Message, dependency any) (err error)
- func (mux *Mux) Handler(subject string, h HandleFunc, mw ...Middleware) *Mux
- func (mux *Mux) HandlerByNumber(subject int, h HandleFunc, mw ...Middleware) *Mux
- func (mux *Mux) Middleware(middlewares ...Middleware) *Mux
- func (mux *Mux) NotFoundHandler(h HandleFunc) *Mux
- func (mux *Mux) PostMiddleware(handleFuncs ...HandleFunc) *Mux
- func (mux *Mux) PreMiddleware(handleFuncs ...HandleFunc) *Mux
- func (mux *Mux) Transform(transform HandleFunc) *Mux
- type Producer
- type Prosumer
- type SafeConcurrencyKind
- type Shutdown
- type UsePrintResult
- func (use UsePrintResult) IgnoreErrSubjects(subjects ...string) UsePrintResult
- func (use UsePrintResult) IgnoreErrors(errs ...error) UsePrintResult
- func (use UsePrintResult) IgnoreOkSubjects(subjects ...string) UsePrintResult
- func (use UsePrintResult) PostMiddleware() Middleware
- func (use UsePrintResult) PrintEgress() UsePrintResult
- func (use UsePrintResult) PrintIngress() UsePrintResult
- type WaitPingPong
Constants ¶
This section is empty.
Variables ¶
var ( ErrClosed = NewCustomError(2001, "service has been closed") ErrNotFound = NewCustomError(2100, "not found") ErrNotFoundSubject = NewCustomError(2101, "not found subject mux") )
Functions ¶
func AnyToString ¶
func CtxWithPingPong ¶
func CtxWithPingPong(ctx context.Context, v WaitPingPong) context.Context
func ErrorExtractCode ¶
func ErrorJoin3rdParty ¶
func GenerateRandomCode ¶
func GenerateUlid ¶
func GenerateUlid() string
func LookupLogLevel ¶
func PutMessage ¶
func PutMessage(message *Message)
func ReliableTask ¶
func SendPingWaitPong ¶
func SendPingWaitPong(sendPingSeconds int, sendPing func() error, waitPong WaitPingPong, isStopped func() bool) error
func SetDefaultLogger ¶
func SetDefaultLogger(l Logger)
func UseSkipMessage ¶
func WaitPingSendPong ¶
func WaitPingSendPong(waitPingSeconds int, waitPing WaitPingPong, sendPong func() error, isStop func() bool) error
Types ¶
type Adapter ¶
type Adapter struct {
// contains filtered or unexported fields
}
func (*Adapter) Identifier ¶
func (*Adapter) OnDisconnect ¶
type AdapterHub ¶
type AdapterOption ¶
type AdapterOption struct {
// contains filtered or unexported fields
}
func NewAdapterOption ¶
func NewAdapterOption() (opt *AdapterOption)
func (*AdapterOption) AdapterHub ¶
func (opt *AdapterOption) AdapterHub(hub AdapterHub) *AdapterOption
func (*AdapterOption) Build ¶
func (opt *AdapterOption) Build() (adp IAdapter, err error)
func (*AdapterOption) DecorateAdapter ¶
func (opt *AdapterOption) DecorateAdapter(wrap func(adapter IAdapter) (application IAdapter)) *AdapterOption
func (*AdapterOption) EgressMux ¶
func (opt *AdapterOption) EgressMux(mux *Mux) *AdapterOption
func (*AdapterOption) Identifier ¶
func (opt *AdapterOption) Identifier(identifier string) *AdapterOption
func (*AdapterOption) IngressMux ¶
func (opt *AdapterOption) IngressMux(mux *Mux) *AdapterOption
func (*AdapterOption) Lifecycle ¶
func (opt *AdapterOption) Lifecycle(setup func(life *Lifecycle)) *AdapterOption
func (*AdapterOption) Logger ¶
func (opt *AdapterOption) Logger(logger Logger) *AdapterOption
func (*AdapterOption) RawFixup ¶ added in v0.49.2
func (opt *AdapterOption) RawFixup(maxRetrySecond int, fixup func(IAdapter) error) *AdapterOption
func (*AdapterOption) RawInfra ¶
func (opt *AdapterOption) RawInfra(infra any) *AdapterOption
func (*AdapterOption) RawRecv ¶ added in v0.49.2
func (opt *AdapterOption) RawRecv(recv func(logger Logger) (message *Message, err error)) *AdapterOption
func (*AdapterOption) RawSend ¶ added in v0.49.2
func (opt *AdapterOption) RawSend(send func(logger Logger, message *Message) error) *AdapterOption
func (*AdapterOption) RawStop ¶ added in v0.49.2
func (opt *AdapterOption) RawStop(stop func(logger Logger) error) *AdapterOption
func (*AdapterOption) SendPing ¶
func (opt *AdapterOption) SendPing(sendPingSeconds int, waitPong WaitPingPong, sendPing func(IAdapter) error) *AdapterOption
SendPing
When SendPingWaitPong sends a ping message and waits for a corresponding pong message. SendPeriod = WaitSecond / 2
func (*AdapterOption) WaitPing ¶
func (opt *AdapterOption) WaitPing(waitPingSeconds int, waitPing WaitPingPong, sendPong func(IAdapter) error) *AdapterOption
WaitPing
When WaitPingSendPong waits for a ping message and response a corresponding pong message. SendPeriod = WaitSecond
type CustomError ¶
type CustomError struct {
// contains filtered or unexported fields
}
func NewCustomError ¶
func NewCustomError(myCode int, title string) *CustomError
func (*CustomError) CustomError ¶
func (c *CustomError) CustomError()
func (*CustomError) Error ¶
func (c *CustomError) Error() string
func (*CustomError) MyCode ¶
func (c *CustomError) MyCode() int
type HandleFunc ¶
func Link ¶
func Link(handler HandleFunc, middlewares ...Middleware) HandleFunc
func UseAdHocFunc ¶
func UseAdHocFunc(AdHoc HandleFunc) HandleFunc
func UsePrintDetail ¶
func UsePrintDetail() HandleFunc
func (HandleFunc) Link ¶
func (h HandleFunc) Link(middlewares ...Middleware) HandleFunc
func (HandleFunc) PostMiddleware ¶
func (h HandleFunc) PostMiddleware() Middleware
func (HandleFunc) PreMiddleware ¶
func (h HandleFunc) PreMiddleware() Middleware
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
func (*Hub) FindMultiByKey ¶
func (*Hub) IsShutdown ¶
func (*Hub) RemoveByKey ¶
func (*Hub) RemoveMulti ¶
If filter returns true, remove target
func (*Hub) RemoveMultiByKey ¶
func (*Hub) SetConcurrencyQty ¶
SetConcurrencyQty concurrencyQty controls how many tasks can run simultaneously, preventing resource usage or avoid frequent context switches.
func (*Hub) UpdateByOldKey ¶
func (*Hub) WaitShutdown ¶
func (hub *Hub) WaitShutdown() chan struct{}
type Lifecycle ¶
type Lifecycle struct {
// contains filtered or unexported fields
}
Lifecycle define a management mechanism when init obj and terminate obj.
func (*Lifecycle) OnDisconnect ¶
type Logger ¶
type Logger interface { Debug(format string, a ...any) Info(format string, a ...any) Warn(format string, a ...any) Error(format string, a ...any) Fatal(format string, a ...any) SetLogLevel(level LogLevel) LogLevel() LogLevel WithCallDepth(externalDepth uint) Logger WithKeyValue(key string, v any) Logger }
func CtxGetLogger ¶
func DefaultLogger ¶
func DefaultLogger() Logger
func SilentLogger ¶
func SilentLogger() Logger
type Message ¶
type Message struct { Subject string Bytes []byte Body any Mutex sync.Mutex // RouteParam are used to capture values from subject. // These parameters represent resources or identifiers. // // Example: // // define mux subject = "/users/{id}" // send or recv subject = "/users/1017" // // get route param: // key : value => id : 1017 RouteParam maputil.Data Metadata maputil.Data RawInfra any Ctx context.Context // contains filtered or unexported fields }
func GetMessage ¶
func GetMessage() *Message
type Middleware ¶
type Middleware func(next HandleFunc) HandleFunc
func UseAsync ¶
func UseAsync() Middleware
func UseCopyMessage ¶ added in v0.50.0
func UseCopyMessage() Middleware
func UseExclude ¶
func UseExclude(subjects []string) Middleware
func UseHowMuchTime ¶
func UseHowMuchTime() Middleware
func UseInclude ¶
func UseInclude(subjects []string) Middleware
func UseLogger ¶
func UseLogger(withMsgId bool, safeConcurrency SafeConcurrencyKind) Middleware
func UseRecover ¶
func UseRecover() Middleware
func UseRetry ¶
func UseRetry(retryMaxSecond int) Middleware
func (Middleware) Link ¶
func (mw Middleware) Link(handler HandleFunc) HandleFunc
type Mux ¶
type Mux struct {
// contains filtered or unexported fields
}
Mux refers to a router or multiplexer, which can be used to handle different message.
Message represents a high-level abstraction data structure containing metadata (e.g. header) + body
func NewMux ¶
NewMux If routeDelimiter is an empty string, Message.RouteParam cannot be used. RouteDelimiter can only be set to a string of length 1. This parameter determines different parts of the Message.Subject.
func (*Mux) DefaultHandler ¶
func (mux *Mux) DefaultHandler(h HandleFunc, mw ...Middleware) *Mux
DefaultHandler When a subject cannot be found, execute the 'Default'.
"The difference between 'Default' and 'NotFound' is that the 'Default' handler will utilize middleware, whereas 'NotFound' won't use middleware."
func (*Mux) EnableMessagePool ¶
func (*Mux) ErrorHandler ¶
func (mux *Mux) ErrorHandler(errHandlers ...Middleware) *Mux
func (*Mux) GroupByNumber ¶
func (*Mux) HandleMessage ¶
HandleMessage is also a HandleFunc, but with added routing capabilities.
func (*Mux) Handler ¶
func (mux *Mux) Handler(subject string, h HandleFunc, mw ...Middleware) *Mux
func (*Mux) HandlerByNumber ¶
func (mux *Mux) HandlerByNumber(subject int, h HandleFunc, mw ...Middleware) *Mux
func (*Mux) Middleware ¶
func (mux *Mux) Middleware(middlewares ...Middleware) *Mux
Middleware Before registering handler, middleware must be defined; otherwise, the handler won't be able to use middleware.
func (*Mux) NotFoundHandler ¶
func (mux *Mux) NotFoundHandler(h HandleFunc) *Mux
NotFoundHandler When a subject cannot be found, execute the 'NotFound'.
"The difference between 'Default' and 'NotFound' is that the 'Default' handler will utilize middleware, whereas 'NotFound' won't use middleware."
func (*Mux) PostMiddleware ¶
func (mux *Mux) PostMiddleware(handleFuncs ...HandleFunc) *Mux
func (*Mux) PreMiddleware ¶
func (mux *Mux) PreMiddleware(handleFuncs ...HandleFunc) *Mux
func (*Mux) Transform ¶
func (mux *Mux) Transform(transform HandleFunc) *Mux
Transform Originally, the message passed through the mux would only call 'getSubject' once. However, if there is a definition of Transform, when the message passes through the Transform function, 'getSubject' will be called again.
type SafeConcurrencyKind ¶
type SafeConcurrencyKind int
const ( SafeConcurrency_Skip SafeConcurrencyKind = iota SafeConcurrency_Mutex SafeConcurrency_Copy )
type Shutdown ¶
type Shutdown struct { Logger Logger // contains filtered or unexported fields }
func NewShutdown ¶
func NewShutdownWithoutTimeout ¶ added in v1.1.0
func NewShutdownWithoutTimeout() *Shutdown
func (*Shutdown) StopService ¶
func (*Shutdown) WaitChannel ¶ added in v1.1.0
func (s *Shutdown) WaitChannel() <-chan struct{}
type UsePrintResult ¶
type UsePrintResult struct {
// contains filtered or unexported fields
}
func (UsePrintResult) IgnoreErrSubjects ¶ added in v0.49.0
func (use UsePrintResult) IgnoreErrSubjects(subjects ...string) UsePrintResult
func (UsePrintResult) IgnoreErrors ¶ added in v0.49.0
func (use UsePrintResult) IgnoreErrors(errs ...error) UsePrintResult
func (UsePrintResult) IgnoreOkSubjects ¶ added in v0.49.0
func (use UsePrintResult) IgnoreOkSubjects(subjects ...string) UsePrintResult
func (UsePrintResult) PostMiddleware ¶ added in v0.49.0
func (use UsePrintResult) PostMiddleware() Middleware
func (UsePrintResult) PrintEgress ¶ added in v0.49.0
func (use UsePrintResult) PrintEgress() UsePrintResult
func (UsePrintResult) PrintIngress ¶ added in v0.49.0
func (use UsePrintResult) PrintIngress() UsePrintResult
type WaitPingPong ¶
type WaitPingPong chan struct{}
func CtxGetPingPong ¶
func CtxGetPingPong(ctx context.Context) WaitPingPong
func NewWaitPingPong ¶
func NewWaitPingPong() WaitPingPong
func (WaitPingPong) Ack ¶
func (wait WaitPingPong) Ack()