Documentation
¶
Index ¶
- Constants
- func ControlMessageHandler(ctx context.Context, kafkaWriter *kafka.Writer, topicVerifier *TopicVerifier) func(MQTT.Client, MQTT.Message)
- func CreateBrokerConnection(brokerUrl string, brokerConfigFuncs ...MqttClientOptionsFunc) (MQTT.Client, error)
- func DataMessageHandler() func(MQTT.Client, MQTT.Message)
- func DefaultMessageHandler(topicVerifier *TopicVerifier, ...) func(client MQTT.Client, message MQTT.Message)
- func NewBrokerOptions(brokerUrl string, opts ...MqttClientOptionsFunc) (*MQTT.ClientOptions, error)
- func NewConnectorClientMQTTProxyFactory(cfg *config.Config, mqttClient MQTT.Client, topicBuilder *TopicBuilder) (controller.ConnectorClientProxyFactory, error)
- func SendReconnectMessageToClient(mqttClient MQTT.Client, logger *logrus.Entry, topicBuilder *TopicBuilder, ...) error
- type ConnectorClientMQTTProxy
- func (cc *ConnectorClientMQTTProxy) Disconnect(ctx context.Context, message string) error
- func (cc *ConnectorClientMQTTProxy) GetCanonicalFacts(ctx context.Context) (domain.CanonicalFacts, error)
- func (cc *ConnectorClientMQTTProxy) GetDispatchers(ctx context.Context) (domain.Dispatchers, error)
- func (cc *ConnectorClientMQTTProxy) GetTags(ctx context.Context) (domain.Tags, error)
- func (cc *ConnectorClientMQTTProxy) Ping(ctx context.Context) error
- func (cc *ConnectorClientMQTTProxy) Reconnect(ctx context.Context, message string, delay int) error
- func (cc *ConnectorClientMQTTProxy) SendMessage(ctx context.Context, directive string, metadata interface{}, ...) (*uuid.UUID, error)
- type ConnectorClientMQTTProxyFactory
- type MqttClientOptionsFunc
- func WithAutoReconnect(autoReconnect bool) MqttClientOptionsFunc
- func WithCleanSession(cleanSession bool) MqttClientOptionsFunc
- func WithClientID(clientID string) MqttClientOptionsFunc
- func WithConnectionLostHandler(handler func(MQTT.Client, error)) MqttClientOptionsFunc
- func WithDefaultPublishHandler(msgHdlr MQTT.MessageHandler) MqttClientOptionsFunc
- func WithJwtAsHttpHeader(tokenGenerator jwt_utils.JwtGenerator) MqttClientOptionsFunc
- func WithJwtReconnectingHandler(tokenGenerator jwt_utils.JwtGenerator) MqttClientOptionsFunc
- func WithOnConnectHandler(handler func(MQTT.Client)) MqttClientOptionsFunc
- func WithProtocolVersion(protocolVersion uint) MqttClientOptionsFunc
- func WithResumeSubs(resumeSubs bool) MqttClientOptionsFunc
- func WithTlsConfig(tlsConfig *tls.Config) MqttClientOptionsFunc
- type Subscriber
- type TopicBuilder
- type TopicType
- type TopicVerifier
Constants ¶
View Source
const ( TopicKafkaHeaderKey = "topic" MessageIDKafkaHeaderKey = "mqtt_message_id" DateReceivedHeaderKey = "date_received" )
Variables ¶
This section is empty.
Functions ¶
func ControlMessageHandler ¶
func CreateBrokerConnection ¶
func CreateBrokerConnection(brokerUrl string, brokerConfigFuncs ...MqttClientOptionsFunc) (MQTT.Client, error)
func DefaultMessageHandler ¶
func NewBrokerOptions ¶
func NewBrokerOptions(brokerUrl string, opts ...MqttClientOptionsFunc) (*MQTT.ClientOptions, error)
func NewConnectorClientMQTTProxyFactory ¶
func NewConnectorClientMQTTProxyFactory(cfg *config.Config, mqttClient MQTT.Client, topicBuilder *TopicBuilder) (controller.ConnectorClientProxyFactory, error)
Types ¶
type ConnectorClientMQTTProxy ¶
type ConnectorClientMQTTProxy struct { Logger *logrus.Entry Config *config.Config OrgID domain.OrgID AccountID domain.AccountID ClientID domain.ClientID Client MQTT.Client TopicBuilder *TopicBuilder Dispatchers domain.Dispatchers CanonicalFacts domain.CanonicalFacts Tags domain.Tags }
func (*ConnectorClientMQTTProxy) Disconnect ¶
func (cc *ConnectorClientMQTTProxy) Disconnect(ctx context.Context, message string) error
func (*ConnectorClientMQTTProxy) GetCanonicalFacts ¶
func (cc *ConnectorClientMQTTProxy) GetCanonicalFacts(ctx context.Context) (domain.CanonicalFacts, error)
func (*ConnectorClientMQTTProxy) GetDispatchers ¶
func (cc *ConnectorClientMQTTProxy) GetDispatchers(ctx context.Context) (domain.Dispatchers, error)
func (*ConnectorClientMQTTProxy) Ping ¶
func (cc *ConnectorClientMQTTProxy) Ping(ctx context.Context) error
func (*ConnectorClientMQTTProxy) SendMessage ¶
type ConnectorClientMQTTProxyFactory ¶
type ConnectorClientMQTTProxyFactory struct {
// contains filtered or unexported fields
}
func (*ConnectorClientMQTTProxyFactory) CreateProxy ¶
func (ccpf *ConnectorClientMQTTProxyFactory) CreateProxy(ctx context.Context, orgID domain.OrgID, account domain.AccountID, client_id domain.ClientID, canonicalFacts domain.CanonicalFacts, dispatchers domain.Dispatchers, tags domain.Tags) (controller.ConnectorClient, error)
type MqttClientOptionsFunc ¶
type MqttClientOptionsFunc func(*MQTT.ClientOptions) error
func WithAutoReconnect ¶
func WithAutoReconnect(autoReconnect bool) MqttClientOptionsFunc
func WithCleanSession ¶
func WithCleanSession(cleanSession bool) MqttClientOptionsFunc
func WithClientID ¶
func WithClientID(clientID string) MqttClientOptionsFunc
func WithConnectionLostHandler ¶
func WithConnectionLostHandler(handler func(MQTT.Client, error)) MqttClientOptionsFunc
func WithDefaultPublishHandler ¶
func WithDefaultPublishHandler(msgHdlr MQTT.MessageHandler) MqttClientOptionsFunc
func WithJwtAsHttpHeader ¶
func WithJwtAsHttpHeader(tokenGenerator jwt_utils.JwtGenerator) MqttClientOptionsFunc
func WithJwtReconnectingHandler ¶
func WithJwtReconnectingHandler(tokenGenerator jwt_utils.JwtGenerator) MqttClientOptionsFunc
func WithOnConnectHandler ¶
func WithOnConnectHandler(handler func(MQTT.Client)) MqttClientOptionsFunc
func WithProtocolVersion ¶
func WithProtocolVersion(protocolVersion uint) MqttClientOptionsFunc
func WithResumeSubs ¶
func WithResumeSubs(resumeSubs bool) MqttClientOptionsFunc
func WithTlsConfig ¶
func WithTlsConfig(tlsConfig *tls.Config) MqttClientOptionsFunc
type Subscriber ¶
type Subscriber struct { Topic string EntryPoint MQTT.MessageHandler Qos byte }
type TopicBuilder ¶
type TopicBuilder struct {
// contains filtered or unexported fields
}
func NewTopicBuilder ¶
func NewTopicBuilder(prefix string) *TopicBuilder
func (*TopicBuilder) BuildIncomingWildcardControlTopic ¶
func (tb *TopicBuilder) BuildIncomingWildcardControlTopic() string
func (*TopicBuilder) BuildIncomingWildcardDataTopic ¶
func (tb *TopicBuilder) BuildIncomingWildcardDataTopic() string
func (*TopicBuilder) BuildOutgoingControlTopic ¶
func (tb *TopicBuilder) BuildOutgoingControlTopic(clientID domain.ClientID) string
func (*TopicBuilder) BuildOutgoingDataTopic ¶
func (tb *TopicBuilder) BuildOutgoingDataTopic(clientID domain.ClientID) string
type TopicVerifier ¶
type TopicVerifier struct {
// contains filtered or unexported fields
}
func NewTopicVerifier ¶
func NewTopicVerifier(prefix string) *TopicVerifier
func (*TopicVerifier) VerifyIncomingTopic ¶
Click to show internal directories.
Click to hide internal directories.