Documentation
¶
Index ¶
- Constants
- func CreateTopics(brokerAddress string, dialer *kafka.Dialer, topics []string) error
- func DeleteTopics(brokerAddress string, dialer *kafka.Dialer, topics []string) error
- func GetGroupId(idElements []string) string
- func HasTopics(brokerAddress string, dialer *kafka.Dialer, topics []string) (bool, error)
- func IsInitRequestforFinish(command int8) bool
- func IsInitRequestforStart(command int8) bool
- type InitCommand
- func (ic *InitCommand) CreateInitRequestForFinish() (*kafka.Message, error)
- func (ic *InitCommand) CreateInitRequestforStart() (*kafka.Message, error)
- func (ic *InitCommand) CreateInitResponseForCreateTopic(errMessage string) (*kafka.Message, error)
- func (ic *InitCommand) CreateInitResponseForNotFindData() (*kafka.Message, error)
- func (ic *InitCommand) GetGroupId() string
- func (ic *InitCommand) IsInitResponseforCreateTopic(message *kafka.Message) (bool, error)
- func (ic *InitCommand) IsInitResponseforNotFindData(message *kafka.Message) (bool, error)
- type InitCommandMessage
- type MessageQueueConfig
- type MessageQueueInitTopics
- type MessageQueueTopic
Constants ¶
View Source
const ( DefaultMaxBatchBytes = 1e6 DefaultUpdatePublishChannelBufferSize = 1000 )
View Source
const (
InitResponseTimeout = 1 * time.Minute
)
Variables ¶
This section is empty.
Functions ¶
func CreateTopics ¶
func DeleteTopics ¶
[TODO?] deleteTopics() is not called in CloseMessageQueues() because subscribers are disconnected to the topics created by a publisher in the case that it restarted.
func GetGroupId ¶
func IsInitRequestforFinish ¶
func IsInitRequestforStart ¶
Types ¶
type InitCommand ¶
type InitCommand struct {
// contains filtered or unexported fields
}
func NewInitCommand ¶
func NewInitCommand(groupId string) *InitCommand
func (*InitCommand) CreateInitRequestForFinish ¶
func (ic *InitCommand) CreateInitRequestForFinish() (*kafka.Message, error)
func (*InitCommand) CreateInitRequestforStart ¶
func (ic *InitCommand) CreateInitRequestforStart() (*kafka.Message, error)
func (*InitCommand) CreateInitResponseForCreateTopic ¶
func (ic *InitCommand) CreateInitResponseForCreateTopic( errMessage string) (*kafka.Message, error)
func (*InitCommand) CreateInitResponseForNotFindData ¶
func (ic *InitCommand) CreateInitResponseForNotFindData() ( *kafka.Message, error)
func (*InitCommand) GetGroupId ¶
func (ic *InitCommand) GetGroupId() string
func (*InitCommand) IsInitResponseforCreateTopic ¶
func (ic *InitCommand) IsInitResponseforCreateTopic( message *kafka.Message) (bool, error)
func (*InitCommand) IsInitResponseforNotFindData ¶
func (ic *InitCommand) IsInitResponseforNotFindData( message *kafka.Message) (bool, error)
type InitCommandMessage ¶
type InitCommandMessage struct { Command int8 `json:"command"` GroupId string `json:"filePath"` ErrMessage string `json:"errMessage,omitempty"` }
func DecodeInitCommand ¶
func DecodeInitCommand( message *kafka.Message) (*InitCommandMessage, error)
type MessageQueueConfig ¶
type MessageQueueConfig struct {
// contains filtered or unexported fields
}
func GetMessageQueueConfig ¶
func GetMessageQueueConfig( dataClient coreclientset.Interface, kubeClient kubernetes.Interface, messageQueueRef *corev1.ObjectReference) (*MessageQueueConfig, error)
func NewMessageQueueConfig ¶
func NewMessageQueueConfig( brokers []string, user string, password string) *MessageQueueConfig
func (*MessageQueueConfig) CreateSaslDialer ¶
func (mqc *MessageQueueConfig) CreateSaslDialer() (*kafka.Dialer, error)
func (*MessageQueueConfig) GetBrokers ¶
func (mqc *MessageQueueConfig) GetBrokers() []string
func (*MessageQueueConfig) GetPassword ¶
func (mqc *MessageQueueConfig) GetPassword() string
func (*MessageQueueConfig) GetUser ¶
func (mqc *MessageQueueConfig) GetUser() string
type MessageQueueInitTopics ¶
type MessageQueueTopic ¶
type MessageQueueTopic struct {
// contains filtered or unexported fields
}
func NewMessageQueueFileSystemTopic ¶
func NewMessageQueueFileSystemTopic(nameElements []string) *MessageQueueTopic
func NewMessageQueueRdbTopic ¶
func NewMessageQueueRdbTopic(nameElements []string) *MessageQueueTopic
func (*MessageQueueTopic) CreateInitTopics ¶
func (mqt *MessageQueueTopic) CreateInitTopics() ( *MessageQueueInitTopics, error)
func (*MessageQueueTopic) CreateUpdateTopic ¶
func (mqt *MessageQueueTopic) CreateUpdateTopic() (string, error)
Click to show internal directories.
Click to hide internal directories.