k2v

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

README

garage-k2v-go

Go client for K2V, an experimental small object key-value storage engine built as part of Garage.

Because the K2V API is not stable, breaking changes in this Go module should be expected until then.

Import

import k2v "code.notaphish.fyi/milas/garage-k2v-go"

Create API client

// Read K2V_ENDPOINT from OS environment variable, e.g. http://localhost:3904.
endpoint := k2v.EndpointFromEnv()

// Read K2V_KEY_ID and K2V_KEY_SECRET from OS environment variables.
key := k2v.KeyFromEnv()

// Alternatively, construct a key by initializing the ID and secret fields on a k2v.Key.
// key := k2v.Key{ID: "GK...", Secret: "..."}

client := k2v.NewClient(endpoint, key)

Operations

type Client
    func NewClient(endpoint string, key Key, opts ...ClientOption) *Client
    func (c *Client) Clone(opts ...ClientOption) *Client
    func (c *Client) Close()
    func (c *Client) DeleteItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken) error
    func (c *Client) InsertBatch(ctx context.Context, b Bucket, items []BatchInsertItem) error
    func (c *Client) InsertItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, item []byte) error
    func (c *Client) PollItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, timeout time.Duration) (Item, CausalityToken, error)
    func (c *Client) PollRange(ctx context.Context, b Bucket, pk string, q PollRangeQuery, timeout time.Duration) (*PollRangeResponse, error)
    func (c *Client) ReadBatch(ctx context.Context, b Bucket, q []ReadBatchSearch) ([]BatchSearchResult, error)
    func (c *Client) ReadIndex(ctx context.Context, b Bucket, q ReadIndexQuery) (*ReadIndexResponse, error)
    func (c *Client) ReadItemMulti(ctx context.Context, b Bucket, pk string, sk string) ([]Item, CausalityToken, error)
    func (c *Client) ReadItemSingle(ctx context.Context, b Bucket, pk string, sk string) (Item, CausalityToken, error)

Scrolling (Client-side / Go API)

To handle iteration in the K2V API, helper functions for simple cases are provided.

For example, to perform a bulk search:

handleBatch := func(result *k2v.BatchSearchResult) error {
  log.Println(result.Items)
  return nil
}
err := k2v.ScrollBatchSearch(ctx, f.cli, f.bucket, []k2v.BatchSearch{
  {
    PartitionKey: "pk1",
  },
  {
    PartitionKey: "pk2",
    Limit:        1,
  },
}, handleBatch)

This will repeatedly make calls to ReadBatch (batch search), using nextStart from the responses to generate subsequent requests until all queries are exhausted.

See ScrollIndex(ctx context.Context, client IndexScroller, b Bucket, query ReadIndexQuery, fn ReadIndexResponseHandler) error for the equivalent for batch index reads.

No helper is available for PollRange() yet.

Integration Tests

K2V_ENDPOINT="http://[::1]:3904" \
K2V_KEY_ID="GK..." \
K2V_KEY_SECRET="..." \
  go test ./...

Usage

Review the K2V API spec and the integration tests in this module for complete examples.

License

Go API client licensed under Apache 2.0

Documentation

Index

Examples

Constants

View Source
const CausalityTokenHeader = "X-Garage-Causality-Token"
View Source
const EnvVarEndpoint = "K2V_ENDPOINT"
View Source
const EnvVarKeyID = "K2V_KEY_ID"
View Source
const EnvVarKeySecret = "K2V_KEY_SECRET"

Variables

View Source
var ConcurrentItemsErr = errors.New("item has multiple concurrent values")
View Source
var NoSuchItemErr = errors.New("item does not exist")
View Source
var NotModifiedTimeoutErr = errors.New("not modified within timeout")
View Source
var StopScroll = errors.New("scroll canceled")
View Source
var TombstoneItemErr = errors.New("item is a tombstone")

Functions

func EndpointFromEnv

func EndpointFromEnv() string

func ScrollBatchSearch added in v0.1.1

func ScrollBatchSearch(ctx context.Context, client BatchSearchScroller, b Bucket, q []BatchSearch, fn BatchSearchResultHandler) error

func ScrollIndex added in v0.1.1

ScrollIndex calls the ReadIndex API serially, invoking the provided function for each response (batch) until there are no more results.

Example
ctx := context.Background()
client := k2v.NewClient(k2v.EndpointFromEnv(), k2v.KeyFromEnv())
defer client.Close()
const bucket = "k2v-test"

pkPrefix := randomPk()
for i := range 5 {
	_ = client.InsertItem(ctx, bucket, pkPrefix+"-"+strconv.Itoa(i), randomSk(), "", []byte("hello"))
}

var responses []*k2v.ReadIndexResponse
_ = k2v.ScrollIndex(ctx, client, bucket, k2v.ReadIndexQuery{Prefix: pkPrefix, Limit: 25}, func(resp *k2v.ReadIndexResponse) error {
	responses = append(responses, resp)
	return nil
})
fmt.Println(len(responses[0].PartitionKeys))
Output:

5

Types

type BatchInsertItem

type BatchInsertItem struct {
	PartitionKey   string  `json:"pk"`
	SortKey        string  `json:"sk"`
	CausalityToken *string `json:"ct"`
	Value          Item    `json:"v"`
}

type BatchSearch added in v0.1.1

type BatchSearch struct {
	PartitionKey string `json:"partitionKey"`

	// Prefix restricts listing to partition keys that start with this value.
	Prefix string `json:"prefix,omitempty"`

	// Start is the first partition key to list, in lexicographical order.
	Start string `json:"start,omitempty"`

	// End is the last partition key to list (excluded).
	End string `json:"end,omitempty"`

	// Limit for maximum number of partition keys to list.
	Limit int `json:"limit,omitempty"`

	// Reverse iterates in reverse lexicographical order.
	Reverse bool `json:"reverse,omitempty"`

	// SingleItem determines whether to return only the item with sort key start.
	SingleItem bool `json:"singleItem,omitempty"`

	// ConflictsOnly determines whether to return only items that have several concurrent values.
	ConflictsOnly bool `json:"conflictsOnly,omitempty"`

	// Tombstones determines whether or not to return tombstone lines to indicate the presence of old deleted items.
	Tombstones bool `json:"tombstones,omitempty"`
}

type BatchSearchResult

type BatchSearchResult struct {
	PartitionKey  string             `json:"partitionKey"`
	Prefix        *string            `json:"prefix"`
	Start         *string            `json:"start"`
	End           *string            `json:"end"`
	Limit         *int               `json:"limit"`
	Reverse       bool               `json:"reverse"`
	SingleItem    bool               `json:"singleItem"`
	ConflictsOnly bool               `json:"conflictsOnly"`
	Tombstones    bool               `json:"tombstones"`
	Items         []SearchResultItem `json:"items"`
	More          bool               `json:"more"`
	NextStart     *string            `json:"nextStart"`
}

type BatchSearchResultHandler added in v0.1.1

type BatchSearchResultHandler func(result *BatchSearchResult) error

type BatchSearchScroller added in v0.1.1

type BatchSearchScroller interface {
	ReadBatch(ctx context.Context, b Bucket, q []BatchSearch) ([]*BatchSearchResult, error)
}

type Bucket

type Bucket string

type BulkGetItem

type BulkGetItem struct {
	PartitionKey   string
	SortKey        string
	CausalityToken CausalityToken
	Values         []Item
}

func BulkGet

func BulkGet(ctx context.Context, cli *Client, b Bucket, keys []ItemKey) ([]BulkGetItem, error)

type CausalityToken

type CausalityToken string

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(endpoint string, key Key, opts ...ClientOption) *Client

func (*Client) Clone

func (c *Client) Clone(opts ...ClientOption) *Client

func (*Client) Close

func (c *Client) Close()

func (*Client) DeleteItem

func (c *Client) DeleteItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken) error

func (*Client) InsertBatch

func (c *Client) InsertBatch(ctx context.Context, b Bucket, items []BatchInsertItem) error

func (*Client) InsertItem

func (c *Client) InsertItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, item []byte) error

func (*Client) PollItem

func (c *Client) PollItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, timeout time.Duration) (Item, CausalityToken, error)

func (*Client) PollRange added in v0.1.1

func (c *Client) PollRange(ctx context.Context, b Bucket, pk string, q PollRangeQuery, timeout time.Duration) (*PollRangeResponse, error)

func (*Client) ReadBatch

func (c *Client) ReadBatch(ctx context.Context, b Bucket, q []BatchSearch) ([]*BatchSearchResult, error)

func (*Client) ReadIndex

func (c *Client) ReadIndex(ctx context.Context, b Bucket, q ReadIndexQuery) (*ReadIndexResponse, error)

func (*Client) ReadItemMulti

func (c *Client) ReadItemMulti(ctx context.Context, b Bucket, pk string, sk string) ([]Item, CausalityToken, error)

func (*Client) ReadItemSingle

func (c *Client) ReadItemSingle(ctx context.Context, b Bucket, pk string, sk string) (Item, CausalityToken, error)

type ClientOption

type ClientOption func(*Client)

func WithHTTPClient

func WithHTTPClient(httpClient *http.Client) ClientOption

func WithRequestMiddleware

func WithRequestMiddleware(middleware ...RequestMiddleware) ClientOption

type IndexScroller added in v0.1.1

type IndexScroller interface {
	ReadIndex(ctx context.Context, b Bucket, q ReadIndexQuery) (*ReadIndexResponse, error)
}

type Item

type Item []byte

func (Item) GoString

func (i Item) GoString() string

type ItemKey

type ItemKey struct {
	PartitionKey string
	SortKey      string
}

type Key

type Key struct {
	ID     string
	Secret string
}

func KeyFromEnv

func KeyFromEnv() Key

type PollRangeQuery added in v0.1.1

type PollRangeQuery struct {
	// Prefix restricts items to poll to those whose sort keys start with this prefix.
	Prefix string `json:"prefix,omitempty"`

	// Start is the sort key of the first item to poll.
	Start string `json:"start,omitempty"`

	// End is the sort key of the last item to poll (excluded).
	End string `json:"end,omitempty"`

	// SeenMarker is an opaque string returned by a previous PollRange call, that represents items already seen.
	SeenMarker string `json:"seenMarker,omitempty"`
}

type PollRangeResponse added in v0.1.1

type PollRangeResponse struct {
	SeenMarker string             `json:"seenMarker"`
	Items      []SearchResultItem `json:"items"`
}

type ReadIndexQuery

type ReadIndexQuery struct {
	// Prefix restricts listing to partition keys that start with this value.
	Prefix string

	// Start is the first partition key to list, in lexicographical order.
	Start string

	// End is the last partition key to list (excluded).
	End string

	// Limit for maximum number of partition keys to list.
	Limit int

	// Reverse iterates in reverse lexicographical order.
	Reverse bool
}

type ReadIndexResponse

type ReadIndexResponse struct {
	Prefix        *string                         `json:"prefix"`
	Start         *string                         `json:"start"`
	End           *string                         `json:"end"`
	Limit         *int                            `json:"limit"`
	Reverse       bool                            `json:"reverse"`
	PartitionKeys []ReadIndexResponsePartitionKey `json:"partitionKeys"`
	More          bool                            `json:"more"`
	NextStart     *string                         `json:"nextStart"`
}

type ReadIndexResponseHandler added in v0.1.1

type ReadIndexResponseHandler func(resp *ReadIndexResponse) error

ReadIndexResponseHandler is invoked for each batch of index read results.

If an error is returned, scrolling is halted and the error is propagated. The sentinel value StopScroll can be returned to end iteration early without propagating an error.

type ReadIndexResponsePartitionKey

type ReadIndexResponsePartitionKey struct {
	PK        string `json:"pk"`
	Entries   int    `json:"entries"`
	Conflicts int    `json:"conflicts"`
	Values    int    `json:"values"`
	Bytes     int    `json:"bytes"`
}

type RequestMiddleware

type RequestMiddleware func(*http.Request) error

type SearchResultItem

type SearchResultItem struct {
	SortKey        string `json:"sk"`
	CausalityToken string `json:"ct"`
	Values         []Item `json:"v"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳