saslplainoauthmechanism

package module
v0.0.0-...-7204e3c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2025 License: Apache-2.0 Imports: 13 Imported by: 0

README

saslplainoauthmechanism

saslplainoauthmechanism provides an implementation of the sasl.Mechanism interface from segmentio/kafka-go that handles authentication to Google Managed Kafka using OAuth Tokens from Application Default Credentials.

It allows you to use Authorization Tokens with SASL/Plain in segmentio/kafka-go, without requiring OAuthBearer support in the library.

Supported Credential Types

saslplainoauthmechanism supports the following Application Default credential types:

  1. GKE Workload Identity Federation.
    1. Note: Native Workload Identity Federation principals (principal:// format) are currently unsupported, you must link your Kubernetes Service Account to a Google Service Account.
  2. Metadata Server Credentials - Such as Google Compute Engine, Cloud Run, etc.
  3. gcloud CLI Application Default Credentials. Specifically the following subset:
    1. user_credentials (gcloud auth application-default login).
    2. impersonated_service_account (gcloud auth application-default login --impersonate-service-account=<sa email>).
    3. Other credential types, including service_account keys are intentionally unsupported. To use Service Account Keys see here.

Usage

  1. Ensure you have granted the principal the Managed Kafka Client Role (see here).

  2. Create a Mechanism and pass it to the dialer.

    package main
    
    import (
        "context"
        "crypto/tls"
        "log"
    
        "github.com/googleapis/managedkafka/sasl-plain-access-token/segmentio/saslplainoauthmechanism"
        "github.com/segmentio/kafka-go"
    )
    
    func main() {
    
        var bootStrapURL = "<broker FQDN>:9092"
        var topicName = "gmk-test"
    
        mechanism, err := saslplainoauthmechanism.NewADCMechanism(context.Background())
        if err != nil {
            log.Fatalf("Error creating mechanism: %v\n", err)
        }
    
        dialer := &kafka.Dialer{
            SASLMechanism: mechanism,
            TLS:           &tls.Config{},
        }
    
        w := kafka.NewWriter(kafka.WriterConfig{
            Brokers: []string{bootStrapURL},
            Topic:   topicName,
            Dialer:  dialer,
        })
    
        if err := w.WriteMessages(context.Background(), kafka.Message{Key: []byte("Key-A"), Value: []byte("Hello World!")}); err != nil {
            log.Fatalf("error writing message %v", err)
        }
    
    }
    
  3. You can optionally provide your own TokenSource and principal email:

    package main
    
    import (
        "context"
        "crypto/tls"
        "log"
    
        "github.com/googleapis/managedkafka/sasl-plain-access-token/segmentio/saslplainoauthmechanism"
        "github.com/segmentio/kafka-go"
        "golang.org/x/oauth2/google"
    )
    
    func main() {
    
        var bootStrapURL = "<broker FQDN>:9092"
        var topicName = "gmk-test"
    
        // Any TokenSource https://pkgo.dev/golang.org/x/oauth2#TokenSource
        manualTokenSource, err := google.FindDefaultCredentials(context.Background(), "https://www.googleapis.com/auth/cloud-platform")
        if err != nil {
            log.Fatalf("error finding credentials: %v\n", err)
        }
    
        mechanism, err := saslplainoauthmechanism.NewMechanismWithTokenSource(context.Background(), manualTokenSource.TokenSource, "[email protected]")
        if err != nil {
            log.Fatalf("Error creating mechanism: %v\n", err)
        }
    
        dialer := &kafka.Dialer{
            SASLMechanism: mechanism,
            TLS:           &tls.Config{},
        }
    
        w := kafka.NewWriter(kafka.WriterConfig{
            Brokers: []string{bootStrapURL},
            Topic:   topicName,
            Dialer:  dialer,
        })
    
        if err := w.WriteMessages(context.Background(), kafka.Message{Key: []byte("Key-A"), Value: []byte("Hello World!")}); err != nil {
            log.Fatalf("error writing message %v", err)
        }
    
    }
    

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Mechanism

type Mechanism struct {
	// contains filtered or unexported fields
}

sasl.Mechanism implementation that provides a valid Google Access Token for the SASL/Plain password

func NewADCMechanism

func NewADCMechanism(ctx context.Context) (*Mechanism, error)

Returns a Mechanism that uses Application Default Credentials as the Token Source and automatically determines the principal email address

func NewMechanismWithTokenSource

func NewMechanismWithTokenSource(ctx context.Context, ts oauth2.TokenSource, principalEmail string) (*Mechanism, error)

Returns a mechanism that takes a custom TokenSource and static Principal Email

func (*Mechanism) Name

func (*Mechanism) Name() string

func (*Mechanism) Next

func (m *Mechanism) Next(ctx context.Context, challenge []byte) (bool, []byte, error)

func (*Mechanism) Start

func (m *Mechanism) Start(ctx context.Context) (sasl.StateMachine, []byte, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳