client

package
v0.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2025 License: BSD-2-Clause, Unlicense Imports: 29 Imported by: 0

README

core Go API

[TOC]

The Go implementation of the agent API should be considered the "reference" implementation. Our other implementations (python) seek to implement the same user-facing interfaces and pattern with language-specific concessions as needed.

The Agent API

Quick Summary

Derive your agent from a *client.BasicAgent, and fulfill the ManagedAgent interface.

type MyAgent struct {
  *client.BasicAgent
}

func (m *MyAgent) Setup() error {
  // Implement your agent setup here
  // Make any calls to ListenFor here
  return nil
}

func (m *MyAgent) Loop() (bool, error) {
  // Implement your agent's data processing here
  return true, nil //return true if you want to continue looping
  //return false, nil //return false if you want to stop loop
}
// Essential agent API for BB communication
func (a *MyAgent) Post(metadata []byte, data []byte, tags string...) error
func (a *MyAgent) Reply(post *client.Post, metadata []byte, data []byte, tags string...) error
func (a *MyAgent) ListenFor(tags...) <-chan *Post
func (a *MyAgent) Log(severity core.Log_Severity, msg string)
// Additional API provided by BasicAgent
func (a *MyAgent) Trace(post *client.Post) ([]*client.Post, error)
// Run your agent (must fulfill the managed agent interface)
my_agent := &MyAgent{
  BasicAgent: client.GetDefaultAgent(blackboard_address)
}
err := client.RunUntilComplete(my_agent)
if err != nil {
  log.Fatal(err)
}
Detailed Explanation
Importing the client package

We suggest the following for importing the client package:

import client "gitlab.com/hoffman-lab/core/cmd/client-apis/go"`
The BasicAgent type

To make an agent, embed a pointer to a BasicAgent into your structure.

type MyAgent struct {
  *client.BasicAgent
}

BasicAgent provides the essential components for an agent including but not limited to posting, replying, and receiving data from the blackboard. You are not required to utilize BasicAgent to communicate with the blackboard however we cannot provide support at this point for those more advanced use cases.

Agent lifecycle

BasicAgent actually runs several processes concurrently, which can leave dangling goroutines and unfreed memory if we're not careful. As a result, we have implemented an agent lifecycle that can handle this cleanup for you.

Utilize the lifecycle management

To utilize the lifecycle, you must implement two functions.

These allow your agent to fulfill the ManagedAgent interface.

func (a *MyAgent) Setup() error {
  return nil
}

func (a *MyAgent) Loop() (bool, error) {
  return true, nil
}

(This pattern is inspired by the Arduino lifecycle)

Setup

Setup is executed once, prior to any communication with the blackboard. Do not call any blackboard communication methods inside of setup. Although they will succeed, this interferes with proper message delivery using the ListenFor method we provide.

Any agent initialization you'd like to perform prior to communicating with the blackboard should be performed here. Additionally, any calls to ListenFor should be made in Setup.

Returning an error from Setup will prevent the agent from starting.

Loop

Loop will be executed infinitely many times until it is told to stop by returning false. Persistence can be achieved by always returning true. Returning an error in the second return parameter from a Loop does not itself cause the agent to stop executing.

Inside of the loop is where your agent should listen for messages, process data, and communicate with the blackboard.

Run your agent using our lifecycle management

An agent that has properly implemented a Setup and Loop method can then be managed using the RunUntilComplete function.

my_agent := &MyAgent{
  BasicAgent: client.GetDefaultAgent(bb_address),
}
err := client.RunUntilComplete(my_agent)
if err != nil {
  log.Fatal("agent returned error:", err)
}

The GetDefaultAgent function returns a properly initialized BasicAgent ready to connect to the blackboard running at bb_address.

Blackboard Communication

The core agent API is fundamentally three methods: Post, Reply, and ListenFor. These three methods provide all key functionality for multiagent systems.

Post

Analogous to a real blackboard, post just means "put something on the blackboard".

tags := []string{"test-message"}
err := my_agent.Post([]byte("some metadata"), []byte("some data data"), tags...)
Tagging

This version of core changes how messages are exchanged between agents.

We have ultimately done away with attributes and promises in favor of a tagging system.

Agents post their messages with a set of tags and other agents listen for a set of tags. Each matching message will surface into the appropriate ListenFor channel. (There are deep and obvious similarities here to traditional pub/sub architectures)

Messages will match if the incoming message contains at least the tags provided to ListenFor. E.g.,

ListenFor("test") matches Post(md, d, "test")
ListenFor("test") also matches Post(md, d, "test", "tag2")

ListenFor("test", "tag2") matches Post(md, d, "test", "tag2")
ListenFor("test", "tag2") does **not** match Post(md, d, "test")

This means that without care in tagging there can easily be agent "cross talk" particularly with multiple agents performing the same or similar tasks. This is intentional and by design and places a great deal of control and flexibility in the hands of the user although new users may find it challenging. Please utilize our issues page for feedback and assistance with the new tagging system if it is unclear, or some additional instruction, guidance, or debugging help is required.

ListenFor

Many agents will want to receive some data from the blackboard to begin executing. ListenFor will provide you with a go channel on which messages with matching tags will arrive. These can then be utilized as normal go channels to automatically receive data from the blackboard.

IMPORTANT: To ensure proper behavior, ListenFor must be called in Setup, before any messages have been sent to the blackboard.

Here is an example from our ping pong agents:

type PingAgent2 struct {
	prime_bb sync.Once // bootstraps the ping/pong routine by sending a ping message once
	pong_ch  <-chan *client.Post
	*client.BasicAgent
}

func (a *PingAgent2) Setup() error {
	a.pong_ch = a.ListenFor([]string{"pong-message"})
	return a.sendPing()
}

func (a *PingAgent2) Loop() (cont bool, err error) {
	select {
	case post := <-a.pong_ch:

		// channel has been closed externally
		if post == nil {
			return false, nil
		}

		if err := a.sendPing(); err != nil {
			return false, err
		}
	case <-time.After(10 * time.Second):
		a.Log(core.Log_Info, "ping agent heartbeat")
	}
	return true, nil
}
Reply

ListenFor returns a Post object. The post has metadata and data that can be used as desired, but importantly, the post object can be used to begin a "chain" of posts using the reply mechanism. A reply actually just shows up on the blackboard as a post, but you'll notice that it has one or more messages listed in the "replying_to" field. It also creates very logical and light links between messages on the blackboard that enable techniques such as tracing.

Here is an example from our Pong agent:

// Pong listens for Pings and replies with pongs
func (a *PongAgent) Loop() (cont bool, err error) {
	select {
	case post := <-a.ping_ch:

		if post == nil {
			return false, nil
		}

		time.Sleep(a.delay_ms)
		err := a.Reply([]*client.Post{post}, nil, nil, "pong-message")
		if err != nil {
			return false, err
		}
	case <-time.After(10 * time.Second):
		a.Log(core.Log_Info, "pong agent heartbeat")
	}

	return true, nil
}
Tracing
func (a *BasicAgent) Trace(post *Post) ([]*Post, error) {

Tracing in a core blackboard allows an agent to retrieve messages via a chain of replies; effectively, we walk the "linked list" of messages and give them back to the tracer.

Here is an example from our (TODO) agent:

// TODO: this is an agent example that we should implement!

Tracing is not part of the core agent interface, however is a special feature of the core base agent.

NOTE: Tracing is a method that exists slightly outside of the core agent API and is a service provided by the core blackboard (some of the secret sauce of our implementation we hope). In fact, although we have placed it into Blackboard interface, certain blackboards implementations may wish to not provide or return useful tracing information.

Logging

Logging is, strictly speaking, not a necessary part of the core protocol, however logging is so ubiquitously useful during development, execution, and monitoring that we'd be hard pressed not to include it. Defining this message type allows us to separate messages meant for agents (posts) which may or may not be human readable, from posts that should be human-readable (logs).

func (a *BasicAgent) Log(severity core.LogSeverity, message string) ([]*Post, error) {

The default severity everyone should use is core.Log_Info. This is a bread and butter log message.

We encourage the following guidelines for the other severities:

core.Log_Debug:    should not appear during normal logging
core.Log_Info:     normal logging
core.Log_Warning:  possible with errors, but can continue
core.Log_Error:    normal operation is not occuring
core.Log_Critical: 🔥🔥🔥 (possibly trigger a halt and catch fire?)
Hello and Goodbye

If using the managed lifecycle, you don't need to worry about sending Hello and Goodbye messages. If you're implementing a more "raw" agent, these messages indicate an agent's first and last connection to the blackboard, and are one way to trigger the concurrent loops necessary to handle message delivery.

We recommend using the managed lifecycle, unless you've got a lot of time to debug.

If you're dead set on skipping the lifecycle, check out agent_pipe.go for an example of how you can do the management yourself.

Examples

We have implemented and will continue to add agents to the agents directory. These are excellent references for how we have implemented and understand our own API.

We recommend your start with looking at our ping pong agents: ping.go, pong.go

These can be executed against a blackboard using

core start server # start the blackboard
core start ping   # start the ping agent
core start pong   # start the pong agent

More advanced examples can be found in the network benchmark, and agent_pipe.go

Please check out the "basic agent" implementation if you need manage agent behavior using even lower-level API elements (out of scope for this documentation).

Documentation

Overview

The transporters labeled "core" are our recommended defaults for most users. They lean permissive rather than safe. Essentially they are trusting of their environment and the messages they can receive. Blackboards also assume agents are not actively hostile to the blackboard itself.

Core HTTP transporter utilizes the transactional HTTPReadTransporter interface. This makes agents based on them suitable for applications where polling the blackboard for new messages makes sense. This is also simpler and expected to work in most if not all environments.

Core HTTP Stream transporter is theoretically a substantial performance improvement and utilized the now mostly common long-lived HTTP connection to stream data back to the client without having to reconnect for each chunk of messages. The stream utilizes HTTP "server sent events" to transmit data back to the client.

Server sent events have the limitation that they are text-based and not a generally good fit for binary data. Thus protojson converts everything to base-64 which incurs a substantial data size performance hit. This will be replaced in the near future with a more apropriate choice for binary data (socket/websocket most likely, however likely to still use protobuffers as our encoding scheme).

You'll note that core interfaces all enable tracing. Although this is not a strict requirement for a blackboard system, we consider it to be a fundamental and unique feature of core.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CompressBytes

func CompressBytes(data []byte) ([]byte, error)

CompressBytes performs zlib compression on a byte array.

func DecompressBytes

func DecompressBytes(data []byte) ([]byte, error)

DecompressBytes decompresses an incoming byte array with zlib compression.

func DeserializeBytesToFile

func DeserializeBytesToFile(data []byte, save_path string) error

DeserializeBytesToFile writes an incoming byte array to a file.

func DeserializeBytesToFileCompressed

func DeserializeBytesToFileCompressed(data_comp []byte, save_path string) error

DeserializeBytesToFileCompressed writes an incoming byte array with zlib compression to a file.

func DeserializeBytesToGonum

func DeserializeBytesToGonum(data []byte) (*mat.Dense, error)

DeserializeBytesToGonum writes an incoming byte array to a Gonum dense matrix.

func DeserializeBytesToGonumCompressed

func DeserializeBytesToGonumCompressed(data []byte) (*mat.Dense, error)

DeserializeBytesToGonumCompressed writes an incoming zlib-compressed byte array to a Gonum dense matrix.

func DeserializeToBinaryBase64

func DeserializeToBinaryBase64(data_encoded string) ([]byte, error)

DeserializeToBinary encodes a data string to bytes

func DeserializeToFileBase64

func DeserializeToFileBase64(data_encoded string, save_path string) error

DeserializeToBinary encodes a data string to a binary file

func GonumToNpy

func GonumToNpy(array *mat.Dense, save_path string) error

GonumToNpy converts a Gonum dense matrix to a numpy array npy file with underlying dtype float64.

func GonumToNumpyBytesIO

func GonumToNumpyBytesIO(array *mat.Dense) ([]byte, error)

GonumToNumpyBytesIO converts a Gonum dense matrix to a numpy array with underlying dtype float64 encoded as a byte array.

func NewCoreAgent added in v0.2.6

func NewCoreAgent(c CoreTransporter) *coreAgent

func NpyToGonum

func NpyToGonum(path string) (*mat.Dense, error)

NpyToGonum converts a .npy file to a Gonum dense matrix.

func NumpyBytesIOtoGonum

func NumpyBytesIOtoGonum(data []byte) (*mat.Dense, error)

NumpyBytesIOtoGonum converts a serialized Numpy array (as read from a BytesIO Python object) to a Gonum dense matrix.

NOTE: The underlying dtype of the numpy array must be float64 to be compatible with Gonum as a dense matrix. Users must keep this in mind if they want Python and Go agents to communicate matrix data directly.

func PackContext

func PackContext(path string) ([]byte, error)

PackContext tars the contents of a directory.

func ParseSSE added in v0.2.6

func ParseSSE(incoming string) *core.Message

Parse decodes an incoming string

func ResolveCoreData

func ResolveCoreData(msg *core.CoreData) ([]byte, error)

ResolveCoreData extracts data from a core.CoreData object.

I don't foresee users needing to call this directly, but I also don't see a good reason to keep it private? That being said, it's not intended to be a well-maintained part of the public interface.

func RunUntilComplete

func RunUntilComplete(agt ManagedAgent) error

RunUntilComplete runs the Setup and Loop functions for a ManagedAgent until the agent terminates. this function is responsible for any asynchronous management. We do not accept an external context because then this exposes an uncertainty in the user-inteface layer as to who should be responsbile for the waitgroup and cancel_func. If we expose it, then the only way to guarantee no conflicts is to require users to set and manage it externally.

That just sucks though and creates a bad experience if users are unfamiliar with waitgroups and blah blah blah.

Anyways, that's a lot of words explaining that putting lifecycle management here including the cleanup and teardown is a very conscious decision. Any decision to move it should not be undertaken lightly and at minimum should be done without touching this function (i.e. add functionality rather than change).

RunUntilComplete is a very important function to our system. It sets the default lifecycle for agents and is our recommended approach. In many ways this mirrors normal cognitive systems that have hardcoded defaults. Strong deviations from the default come with risk and consequence, while the hardcoded tried-and-true pathways (that we have tested) allow users something somewhat more predictable. Even if an agent makes it through the startup and runs a few loops, there's no guarantee they'll get to call their shutdown function. This could harm the host system by overutilizing resources and harm the other agents on the system by aggressively competing with them for time on the blackboard.

This is a free function (public domain, unlicense) The ordering of operations found here and in the git history is free The conceptual implementation is free The go code is free The any derivative implementation we do is free Any derivative implementation others do should be unequivocally free, however recognition of this will vary

func SerializeFileToBytes

func SerializeFileToBytes(path string) ([]byte, error)

SerializeFileToBytes serializes a specified file to a byte array.

func SerializeFileToBytesCompressed

func SerializeFileToBytesCompressed(path string) ([]byte, error)

SerializeFileToBytesCompressed performs zlib compression on a specified file and serializes to a byte array.

func SerializeFromBinaryBase64

func SerializeFromBinaryBase64(data []byte) string

SerializeFromBinary encodes a byte slice with base64 serialization.

func SerializeFromFileBase64

func SerializeFromFileBase64(filepath string) (string, error)

SerializeFromFile encodes a file with base64 serialization.

func SerializeGonumToBytes

func SerializeGonumToBytes(array *mat.Dense) ([]byte, error)

SerializeGonumToBytes wraps mat.Dense.MarshalBinary.

func SerializeGonumToBytesCompressed

func SerializeGonumToBytesCompressed(array *mat.Dense) ([]byte, error)

SerializeGonumToBytes performs zlib compression on a mat.Dense object and serializes to bytes.

func UnpackContext

func UnpackContext(root_dir string, archived_context []byte) error

Unpacks the contents of an archive into root_dir. root_dir is created if it does not already exist.

Types

type AgentCreator added in v0.2.6

type AgentCreator interface {
	CreateAgent(spec *core.AgentSpec) error
}

type BlackboardTransporter

type BlackboardTransporter CoreTransporter

This is "duplicated" code (it's just the blackboard interface) but we need to avoid the import cycle. And the function here is different (allowing our agents to utilize a standard interface to different transport code.)

type Controller added in v0.2.6

type Controller interface {
	Halt() error
	HaltAndCatchFire() error
}

type CoreAgent added in v0.2.6

The CoreAgent interface is what we consider a good starting set of functionality for most agents. Core is permissive, and thus we allow AgentCreator

func GetDefaultAgent

func GetDefaultAgent(bb_addr string) CoreAgent

type CoreAgentCreator added in v0.2.6

type CoreAgentCreator struct {
	WriteTransporter
}

func NewCoreAgentCreator added in v0.2.6

func NewCoreAgentCreator(w WriteTransporter) *CoreAgentCreator

func (*CoreAgentCreator) CreateAgent added in v0.2.6

func (c *CoreAgentCreator) CreateAgent(spec *core.AgentSpec) error

TODO: should we expose a way to configure the runner? (see commented line) by and large, agents shouldn't worry too much about how they get run. By virtue of being posted to the blackboard, we can trust that something will try and run them.

type CoreController added in v0.2.6

type CoreController struct {
	WriteTransporter
}

func NewCoreController added in v0.2.6

func NewCoreController(w WriteTransporter) *CoreController

func (*CoreController) Halt added in v0.2.6

func (c *CoreController) Halt() error

func (*CoreController) HaltAndCatchFire added in v0.2.6

func (c *CoreController) HaltAndCatchFire() error

type CoreGreeter added in v0.2.6

type CoreGreeter struct {
	WriteTransporter
}

func NewCoreGreeter added in v0.2.6

func NewCoreGreeter(w WriteTransporter) *CoreGreeter

func (*CoreGreeter) Goodbye added in v0.2.6

func (c *CoreGreeter) Goodbye() error

func (*CoreGreeter) Hello added in v0.2.6

func (c *CoreGreeter) Hello() error

type CoreHTTPStreamAdapter added in v0.2.6

type CoreHTTPStreamAdapter struct {
	*http.Client
	*HTTPReadTransporter
	*HTTPWriteTransporter
	*HTTPTraceTransporter
	// contains filtered or unexported fields
}

func NewCoreHTTPStreamAdapter added in v0.2.6

func NewCoreHTTPStreamAdapter(bb_addr string, c *http.Client) *CoreHTTPStreamAdapter

type CoreHTTPTransporter added in v0.2.6

type CoreHTTPTransporter struct {
	*http.Client
	*HTTPStreamTransporter
	*HTTPWriteTransporter
	*HTTPTraceTransporter
	// contains filtered or unexported fields
}

func NewCoreHTTPTransporter added in v0.2.6

func NewCoreHTTPTransporter(bb_addr string, c *http.Client) *CoreHTTPTransporter

type CoreListener added in v0.2.6

type CoreListener struct {
	StreamTransporter
	// contains filtered or unexported fields
}

func NewCoreListener added in v0.2.6

func NewCoreListener(s StreamTransporter) *CoreListener

func (*CoreListener) ListenFor added in v0.2.6

func (c *CoreListener) ListenFor(tags []string) <-chan *Post

ListenFor adds tag filters to the Listener's filter chain. For guaranteed behavaior, you *must* put your calls to ListenFor prior to StartListening. Any filters added after the call to start listening will only process new incoming messages (although this dynamic adding of filter should be supported)

func (*CoreListener) ListenForCreateAgent added in v0.2.6

func (c *CoreListener) ListenForCreateAgent() <-chan *CreateAgent

func (*CoreListener) ListenForHalt added in v0.2.6

func (c *CoreListener) ListenForHalt() <-chan struct{}

func (*CoreListener) ListenForHello added in v0.2.6

func (c *CoreListener) ListenForHello() <-chan string

func (*CoreListener) StartListening added in v0.2.6

func (c *CoreListener) StartListening(ctx context.Context) error

type CoreLogger added in v0.2.6

type CoreLogger struct {
	WriteTransporter
}

A core logger must be able to write to blackboard

func NewCoreLogger added in v0.2.6

func NewCoreLogger(w WriteTransporter) *CoreLogger

func (*CoreLogger) Log added in v0.2.6

func (c *CoreLogger) Log(sev core.Log_Severity, msg string) error

type CorePoster added in v0.2.6

type CorePoster struct {
	WriteTransporter
}

func NewCorePoster added in v0.2.6

func NewCorePoster(w WriteTransporter) *CorePoster

func (*CorePoster) Post added in v0.2.6

func (c *CorePoster) Post(metadata []byte, data []byte, tags ...string) error

func (*CorePoster) Reply added in v0.2.6

func (c *CorePoster) Reply(posts []*Post, metadata, data []byte, tags ...string) error

type CoreTracer added in v0.2.6

type CoreTracer struct {
	TraceTransporter
}

A core logger must be able to write to blackboard

func NewCoreTracer added in v0.2.6

func NewCoreTracer(w TraceTransporter) *CoreTracer

func (*CoreTracer) Trace added in v0.2.6

func (c *CoreTracer) Trace(post *Post) ([]*Post, error)

type CoreTransporter added in v0.2.6

type CoreTransporter interface {
	StreamTransporter
	WriteTransporter
	TraceTransporter
}

Our recommended functionality for most agent

type CreateAgent added in v0.2.6

type CreateAgent struct {
	Runner     string // Who should run the agent
	Name       string // What should the agent tell everyone its name is
	Entrypoint string // What "command" should the runner use to start the agent (this is likely to be specific to the type of runner executing Athe agent)
	Attributes []byte // Data blob for passing parameters
	Context    []byte // Data blob for passing materials the agent may need to do its job (like a tar file; also likely to be specific to runner)
}

type CreateAgentFilter added in v0.2.6

type CreateAgentFilter struct {
	// contains filtered or unexported fields
}

func (*CreateAgentFilter) Filter added in v0.2.6

func (c *CreateAgentFilter) Filter(msg *core.Message) error

type Filter

type Filter interface {
	Filter(msg *core.Message) error
}

Filters and FilterChains process incoming channel messages and surface them based on their specific filter implementation. I think, subconsciously, this ideas is loosely based on firewall filters/rules. NOTE: Might be a better name for this stuff

type FilterChain

type FilterChain struct {
	// contains filtered or unexported fields
}

A filter chain provides a convenient way to surface a message accross multiple potential matches. NOTE: FilterChain should probably become an interface sooner rather than later to allow for different implementations of chain behavior.

func NewFilterChain

func NewFilterChain() *FilterChain

NewFilterChain is the constructor for the FilterChain type.

func (*FilterChain) Cleanup

func (f *FilterChain) Cleanup()

Cleanup closes the channel of each filter in the chain.

func (*FilterChain) Insert

func (f *FilterChain) Insert(filt Filter)

Insert wraps append, acting as a set method for the `filters` field.

func (*FilterChain) Len added in v0.2.6

func (f *FilterChain) Len() int

func (*FilterChain) ListenAndFilter

func (f *FilterChain) ListenAndFilter(ctx context.Context, incoming chan *core.Message)

ListenAndFilter filters an incoming message based on the filters in the chain.

type GoodListener added in v0.2.6

type GoodListener interface {
	Listener
	ListenForCreateAgent() <-chan *CreateAgent
	ListenForHalt() <-chan struct{} // Should this instead surface the source of the halt rather than just a signal?
	ListenForHello() <-chan string
}

type Greeter added in v0.2.6

type Greeter interface {
	Hello() error
	Goodbye() error
}

type HTTPReadTransporter added in v0.2.6

type HTTPReadTransporter struct {
	*http.Client

	StartIdx int
	// contains filtered or unexported fields
}

NOTE: the http.Client should be reused across http transporters as much as possible. This will facilitate reusing open/cached TCP state, etc. By design, it must be passed, even if only a zero value (which is a valid client `go doc http.Client`).

func NewHTTPReadTransporter added in v0.2.6

func NewHTTPReadTransporter(bb_addr string, c *http.Client) *HTTPReadTransporter

func (*HTTPReadTransporter) Len added in v0.2.6

func (h *HTTPReadTransporter) Len() (int, error)

Get the current length of the blackboard. Efficient agent joining requires this.

func (*HTTPReadTransporter) Read added in v0.2.6

func (h *HTTPReadTransporter) Read(msg_idx int) (*core.Message, error)

TODO: needs an http client TODO: needs a slicer? There's really no such thg as "reading" is there? Only slicing.

func (*HTTPReadTransporter) Slice added in v0.2.6

func (h *HTTPReadTransporter) Slice(start_idx, end_idx int) ([]*core.Message, error)

func (*HTTPReadTransporter) Stream added in v0.2.6

func (h *HTTPReadTransporter) Stream(ctx context.Context) (chan *core.Message, error)

This is a hack. We essentially make it look like we're producing a stream a MessageStack however we're really just polling and writing them into a channel. This sets the stage to move to a model where all messages arrive to clients in a "stream" (producer/consumer)

Stream produces all messages to the agent for the agent to handle filtering

type HTTPStreamTransporter added in v0.2.6

type HTTPStreamTransporter struct {
	*http.Client
	// contains filtered or unexported fields
}

StreamTransporter provides a stream of messages from the blackboard as served sent events. This is a relatively naive form of streaming.

func NewHTTPStreamTransporter added in v0.2.6

func NewHTTPStreamTransporter(bb_addr string, c *http.Client) *HTTPStreamTransporter

func (*HTTPStreamTransporter) Stream added in v0.2.6

func (h *HTTPStreamTransporter) Stream(ctx context.Context) (chan *core.Message, error)

Stream creates and returns a channel on which messages will arrive from the blackboard

type HTTPTraceTransporter added in v0.2.6

type HTTPTraceTransporter struct {
	*http.Client
	// contains filtered or unexported fields
}

func NewHTTPTraceTransporter added in v0.2.6

func NewHTTPTraceTransporter(bb_addr string, c *http.Client) *HTTPTraceTransporter

func (*HTTPTraceTransporter) Trace added in v0.2.6

func (h *HTTPTraceTransporter) Trace(msg_idx int, depth int) ([]*core.Message, error)

type HTTPWriteTransporter added in v0.2.6

type HTTPWriteTransporter struct {
	*http.Client
	// contains filtered or unexported fields
}

func NewHTTPWriteTransporter added in v0.2.6

func NewHTTPWriteTransporter(bb_addr string, c *http.Client) *HTTPWriteTransporter

func (*HTTPWriteTransporter) GetName added in v0.2.6

func (h *HTTPWriteTransporter) GetName() string

func (*HTTPWriteTransporter) SetName added in v0.2.6

func (h *HTTPWriteTransporter) SetName(name string)

func (*HTTPWriteTransporter) Write added in v0.2.6

func (h *HTTPWriteTransporter) Write(msg *core.Message) error

type HaltFilter added in v0.2.6

type HaltFilter struct {
	// contains filtered or unexported fields
}

func (*HaltFilter) Filter added in v0.2.6

func (h *HaltFilter) Filter(msg *core.Message) error

type HelloFilter added in v0.2.6

type HelloFilter struct {
	// contains filtered or unexported fields
}

func (*HelloFilter) Filter added in v0.2.6

func (h *HelloFilter) Filter(msg *core.Message) error

type IdentifiableAgent added in v0.2.6

type IdentifiableAgent interface {
	SetName(string)
	GetName() string
}

type Listener added in v0.2.6

type Listener interface {
	StartListening(ctx context.Context) error
	ListenFor(tags []string) <-chan *Post
}

These are the minimal agent interfaces we want to specify These describe the general functionality we want our users to directly interact with. I.e. users are intended to be developing at the "agent" level, and core handles the communication/transit level.

the more abstract composed interfaces are intended as semi-official recommendations for functionality that should be implemented for certain desired agent behaviors. Anything prefixed with "core" is our recommended default, which skews permissive and easy-to-use rather than secure. Please keep that in mind for your application.

type Logger added in v0.2.6

type Logger interface {
	Log(sev core.Log_Severity, msg string) error
}

type ManagedAgent

type ManagedAgent interface {
	CoreAgent
	Setup() error
	Loop(ctx context.Context) (continue_loop bool, err error)
}

ManagedAgent is an interface enforcing the agent-defined Setup and Loop methods and SetName (should be handled by the executable).

Free the agents: keep this interface as narrow as possible

type Post

type Post struct {
	Tags     []string
	Metadata []byte
	Data     []byte
	// contains filtered or unexported fields
}

Post is a user-facing "Post" type that abstracts the fields of core.Post

func PostFromPostMessage

func PostFromPostMessage(msg_envelope *core.Message) (*Post, error)

PostFromPostMessage converts a core.Post message to the user-facing Post type.

Ditto for this one about the public facing interface and all We need to surface the potential http errors to troubleshoot if object store is unavailable.

func (*Post) Hash

func (p *Post) Hash() string

Hash produces an md5-based hash by concatenating the md5 checksum of the metadata, data, and tags of the Post object. At present, this has no relation to the "hash" stdlib.

func (*Post) ReplyIndex added in v0.2.6

func (p *Post) ReplyIndex() int

func (*Post) Source added in v0.2.6

func (p *Post) Source() string

type Poster added in v0.2.6

type Poster interface {
	Post(metadata []byte, data []byte, tags ...string) error
	Reply(posts []*Post, metadata, data []byte, tags ...string) error
}

type ReadTransporter added in v0.2.6

type ReadTransporter interface {
	Read(msg_idx int) (*core.Message, error)
	Len() (int, error)
	Slice(start_idx, end_idx int) ([]*core.Message, error)
}

Readers are able to read a single message. Will not be supported by all clients or blackboards. Required for a polling-style agent

type Replyable added in v0.2.6

type Replyable interface {
	ReplyIndex() int
}

type Sourceable added in v0.2.6

type Sourceable interface {
	Source() string
}

type StreamTransporter added in v0.2.6

type StreamTransporter interface {
	Stream(ctx context.Context) (chan *core.Message, error)
}

A transporter than can provide the blackboard messages via a long-lived connection to the server (or something that mimics it)

type TagFilter

type TagFilter struct {
	TagSet
	// contains filtered or unexported fields
}

A TagFilter filters messages using the TagSet's Match function. If a message passes the filter, it is written into the associated channel.

func (*TagFilter) Filter

func (f *TagFilter) Filter(msg *core.Message) error

Filter filters post messages based on a tagset.

type TagSet

type TagSet map[string]struct{}

TagSet is a type alias for map[string]struct{}, which essentially mimics a set of tag string values.

func NewTagSet

func NewTagSet(tags []string) TagSet

NewTagSet generates a TagSet from a slice. This will have the effect of removing any duplicate tags from the incoming slice.

func (TagSet) Matches

func (t TagSet) Matches(incoming TagSet) bool

In general, we don't care what "Matches" actually means that much, just that it is definedby this function. However, in THIS implementation, it means the receiver tag set is a subset of the incoming tag set. E.g., if I am ListenFor(TagSet{"lung", "segmentation"}), I would return any message that contains *at least* these two tags. Other implementations may have stronger (or weaker) notions of a match.

func (TagSet) String

func (t TagSet) String() string

Fulfill the stringer interface

type TraceTransporter added in v0.2.6

type TraceTransporter interface {
	Trace(msg_idx int, depth int) ([]*core.Message, error)
}

type Tracer added in v0.2.6

type Tracer interface {
	Trace(post *Post) ([]*Post, error)
}

type WriteTransporter added in v0.2.6

type WriteTransporter interface {
	IdentifiableAgent
	Write(msg *core.Message) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳