Documentation
¶
Overview ¶
The transporters labeled "core" are our recommended defaults for most users. They lean permissive rather than safe. Essentially they are trusting of their environment and the messages they can receive. Blackboards also assume agents are not actively hostile to the blackboard itself.
Core HTTP transporter utilizes the transactional HTTPReadTransporter interface. This makes agents based on them suitable for applications where polling the blackboard for new messages makes sense. This is also simpler and expected to work in most if not all environments.
Core HTTP Stream transporter is theoretically a substantial performance improvement and utilized the now mostly common long-lived HTTP connection to stream data back to the client without having to reconnect for each chunk of messages. The stream utilizes HTTP "server sent events" to transmit data back to the client.
Server sent events have the limitation that they are text-based and not a generally good fit for binary data. Thus protojson converts everything to base-64 which incurs a substantial data size performance hit. This will be replaced in the near future with a more apropriate choice for binary data (socket/websocket most likely, however likely to still use protobuffers as our encoding scheme).
You'll note that core interfaces all enable tracing. Although this is not a strict requirement for a blackboard system, we consider it to be a fundamental and unique feature of core.
Index ¶
- func CompressBytes(data []byte) ([]byte, error)
- func DecompressBytes(data []byte) ([]byte, error)
- func DeserializeBytesToFile(data []byte, save_path string) error
- func DeserializeBytesToFileCompressed(data_comp []byte, save_path string) error
- func DeserializeBytesToGonum(data []byte) (*mat.Dense, error)
- func DeserializeBytesToGonumCompressed(data []byte) (*mat.Dense, error)
- func DeserializeToBinaryBase64(data_encoded string) ([]byte, error)
- func DeserializeToFileBase64(data_encoded string, save_path string) error
- func GonumToNpy(array *mat.Dense, save_path string) error
- func GonumToNumpyBytesIO(array *mat.Dense) ([]byte, error)
- func NewCoreAgent(c CoreTransporter) *coreAgent
- func NpyToGonum(path string) (*mat.Dense, error)
- func NumpyBytesIOtoGonum(data []byte) (*mat.Dense, error)
- func PackContext(path string) ([]byte, error)
- func ParseSSE(incoming string) *core.Message
- func ResolveCoreData(msg *core.CoreData) ([]byte, error)
- func RunUntilComplete(agt ManagedAgent) error
- func SerializeFileToBytes(path string) ([]byte, error)
- func SerializeFileToBytesCompressed(path string) ([]byte, error)
- func SerializeFromBinaryBase64(data []byte) string
- func SerializeFromFileBase64(filepath string) (string, error)
- func SerializeGonumToBytes(array *mat.Dense) ([]byte, error)
- func SerializeGonumToBytesCompressed(array *mat.Dense) ([]byte, error)
- func UnpackContext(root_dir string, archived_context []byte) error
- type AgentCreator
- type BlackboardTransporter
- type Controller
- type CoreAgent
- type CoreAgentCreator
- type CoreController
- type CoreGreeter
- type CoreHTTPStreamAdapter
- type CoreHTTPTransporter
- type CoreListener
- func (c *CoreListener) ListenFor(tags []string) <-chan *Post
- func (c *CoreListener) ListenForCreateAgent() <-chan *CreateAgent
- func (c *CoreListener) ListenForHalt() <-chan struct{}
- func (c *CoreListener) ListenForHello() <-chan string
- func (c *CoreListener) StartListening(ctx context.Context) error
- type CoreLogger
- type CorePoster
- type CoreTracer
- type CoreTransporter
- type CreateAgent
- type CreateAgentFilter
- type Filter
- type FilterChain
- type GoodListener
- type Greeter
- type HTTPReadTransporter
- type HTTPStreamTransporter
- type HTTPTraceTransporter
- type HTTPWriteTransporter
- type HaltFilter
- type HelloFilter
- type IdentifiableAgent
- type Listener
- type Logger
- type ManagedAgent
- type Post
- type Poster
- type ReadTransporter
- type Replyable
- type Sourceable
- type StreamTransporter
- type TagFilter
- type TagSet
- type TraceTransporter
- type Tracer
- type WriteTransporter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CompressBytes ¶
CompressBytes performs zlib compression on a byte array.
func DecompressBytes ¶
DecompressBytes decompresses an incoming byte array with zlib compression.
func DeserializeBytesToFile ¶
DeserializeBytesToFile writes an incoming byte array to a file.
func DeserializeBytesToFileCompressed ¶
DeserializeBytesToFileCompressed writes an incoming byte array with zlib compression to a file.
func DeserializeBytesToGonum ¶
DeserializeBytesToGonum writes an incoming byte array to a Gonum dense matrix.
func DeserializeBytesToGonumCompressed ¶
DeserializeBytesToGonumCompressed writes an incoming zlib-compressed byte array to a Gonum dense matrix.
func DeserializeToBinaryBase64 ¶
DeserializeToBinary encodes a data string to bytes
func DeserializeToFileBase64 ¶
DeserializeToBinary encodes a data string to a binary file
func GonumToNpy ¶
GonumToNpy converts a Gonum dense matrix to a numpy array npy file with underlying dtype float64.
func GonumToNumpyBytesIO ¶
GonumToNumpyBytesIO converts a Gonum dense matrix to a numpy array with underlying dtype float64 encoded as a byte array.
func NewCoreAgent ¶ added in v0.2.6
func NewCoreAgent(c CoreTransporter) *coreAgent
func NpyToGonum ¶
NpyToGonum converts a .npy file to a Gonum dense matrix.
func NumpyBytesIOtoGonum ¶
NumpyBytesIOtoGonum converts a serialized Numpy array (as read from a BytesIO Python object) to a Gonum dense matrix.
NOTE: The underlying dtype of the numpy array must be float64 to be compatible with Gonum as a dense matrix. Users must keep this in mind if they want Python and Go agents to communicate matrix data directly.
func PackContext ¶
PackContext tars the contents of a directory.
func ResolveCoreData ¶
ResolveCoreData extracts data from a core.CoreData object.
I don't foresee users needing to call this directly, but I also don't see a good reason to keep it private? That being said, it's not intended to be a well-maintained part of the public interface.
func RunUntilComplete ¶
func RunUntilComplete(agt ManagedAgent) error
RunUntilComplete runs the Setup and Loop functions for a ManagedAgent until the agent terminates. this function is responsible for any asynchronous management. We do not accept an external context because then this exposes an uncertainty in the user-inteface layer as to who should be responsbile for the waitgroup and cancel_func. If we expose it, then the only way to guarantee no conflicts is to require users to set and manage it externally.
That just sucks though and creates a bad experience if users are unfamiliar with waitgroups and blah blah blah.
Anyways, that's a lot of words explaining that putting lifecycle management here including the cleanup and teardown is a very conscious decision. Any decision to move it should not be undertaken lightly and at minimum should be done without touching this function (i.e. add functionality rather than change).
RunUntilComplete is a very important function to our system. It sets the default lifecycle for agents and is our recommended approach. In many ways this mirrors normal cognitive systems that have hardcoded defaults. Strong deviations from the default come with risk and consequence, while the hardcoded tried-and-true pathways (that we have tested) allow users something somewhat more predictable. Even if an agent makes it through the startup and runs a few loops, there's no guarantee they'll get to call their shutdown function. This could harm the host system by overutilizing resources and harm the other agents on the system by aggressively competing with them for time on the blackboard.
This is a free function (public domain, unlicense) The ordering of operations found here and in the git history is free The conceptual implementation is free The go code is free The any derivative implementation we do is free Any derivative implementation others do should be unequivocally free, however recognition of this will vary
func SerializeFileToBytes ¶
SerializeFileToBytes serializes a specified file to a byte array.
func SerializeFileToBytesCompressed ¶
SerializeFileToBytesCompressed performs zlib compression on a specified file and serializes to a byte array.
func SerializeFromBinaryBase64 ¶
SerializeFromBinary encodes a byte slice with base64 serialization.
func SerializeFromFileBase64 ¶
SerializeFromFile encodes a file with base64 serialization.
func SerializeGonumToBytes ¶
SerializeGonumToBytes wraps mat.Dense.MarshalBinary.
func SerializeGonumToBytesCompressed ¶
SerializeGonumToBytes performs zlib compression on a mat.Dense object and serializes to bytes.
func UnpackContext ¶
Unpacks the contents of an archive into root_dir. root_dir is created if it does not already exist.
Types ¶
type AgentCreator ¶ added in v0.2.6
type BlackboardTransporter ¶
type BlackboardTransporter CoreTransporter
This is "duplicated" code (it's just the blackboard interface) but we need to avoid the import cycle. And the function here is different (allowing our agents to utilize a standard interface to different transport code.)
type Controller ¶ added in v0.2.6
type CoreAgent ¶ added in v0.2.6
type CoreAgent interface { IdentifiableAgent Greeter GoodListener Poster Logger Tracer AgentCreator }
The CoreAgent interface is what we consider a good starting set of functionality for most agents. Core is permissive, and thus we allow AgentCreator
func GetDefaultAgent ¶
type CoreAgentCreator ¶ added in v0.2.6
type CoreAgentCreator struct {
WriteTransporter
}
func NewCoreAgentCreator ¶ added in v0.2.6
func NewCoreAgentCreator(w WriteTransporter) *CoreAgentCreator
func (*CoreAgentCreator) CreateAgent ¶ added in v0.2.6
func (c *CoreAgentCreator) CreateAgent(spec *core.AgentSpec) error
TODO: should we expose a way to configure the runner? (see commented line) by and large, agents shouldn't worry too much about how they get run. By virtue of being posted to the blackboard, we can trust that something will try and run them.
type CoreController ¶ added in v0.2.6
type CoreController struct {
WriteTransporter
}
func NewCoreController ¶ added in v0.2.6
func NewCoreController(w WriteTransporter) *CoreController
func (*CoreController) Halt ¶ added in v0.2.6
func (c *CoreController) Halt() error
func (*CoreController) HaltAndCatchFire ¶ added in v0.2.6
func (c *CoreController) HaltAndCatchFire() error
type CoreGreeter ¶ added in v0.2.6
type CoreGreeter struct {
WriteTransporter
}
func NewCoreGreeter ¶ added in v0.2.6
func NewCoreGreeter(w WriteTransporter) *CoreGreeter
func (*CoreGreeter) Goodbye ¶ added in v0.2.6
func (c *CoreGreeter) Goodbye() error
func (*CoreGreeter) Hello ¶ added in v0.2.6
func (c *CoreGreeter) Hello() error
type CoreHTTPStreamAdapter ¶ added in v0.2.6
type CoreHTTPStreamAdapter struct { *http.Client *HTTPReadTransporter *HTTPWriteTransporter *HTTPTraceTransporter // contains filtered or unexported fields }
func NewCoreHTTPStreamAdapter ¶ added in v0.2.6
func NewCoreHTTPStreamAdapter(bb_addr string, c *http.Client) *CoreHTTPStreamAdapter
type CoreHTTPTransporter ¶ added in v0.2.6
type CoreHTTPTransporter struct { *http.Client *HTTPStreamTransporter *HTTPWriteTransporter *HTTPTraceTransporter // contains filtered or unexported fields }
func NewCoreHTTPTransporter ¶ added in v0.2.6
func NewCoreHTTPTransporter(bb_addr string, c *http.Client) *CoreHTTPTransporter
type CoreListener ¶ added in v0.2.6
type CoreListener struct { StreamTransporter // contains filtered or unexported fields }
func NewCoreListener ¶ added in v0.2.6
func NewCoreListener(s StreamTransporter) *CoreListener
func (*CoreListener) ListenFor ¶ added in v0.2.6
func (c *CoreListener) ListenFor(tags []string) <-chan *Post
ListenFor adds tag filters to the Listener's filter chain. For guaranteed behavaior, you *must* put your calls to ListenFor prior to StartListening. Any filters added after the call to start listening will only process new incoming messages (although this dynamic adding of filter should be supported)
func (*CoreListener) ListenForCreateAgent ¶ added in v0.2.6
func (c *CoreListener) ListenForCreateAgent() <-chan *CreateAgent
func (*CoreListener) ListenForHalt ¶ added in v0.2.6
func (c *CoreListener) ListenForHalt() <-chan struct{}
func (*CoreListener) ListenForHello ¶ added in v0.2.6
func (c *CoreListener) ListenForHello() <-chan string
func (*CoreListener) StartListening ¶ added in v0.2.6
func (c *CoreListener) StartListening(ctx context.Context) error
type CoreLogger ¶ added in v0.2.6
type CoreLogger struct {
WriteTransporter
}
A core logger must be able to write to blackboard
func NewCoreLogger ¶ added in v0.2.6
func NewCoreLogger(w WriteTransporter) *CoreLogger
func (*CoreLogger) Log ¶ added in v0.2.6
func (c *CoreLogger) Log(sev core.Log_Severity, msg string) error
type CorePoster ¶ added in v0.2.6
type CorePoster struct {
WriteTransporter
}
func NewCorePoster ¶ added in v0.2.6
func NewCorePoster(w WriteTransporter) *CorePoster
type CoreTracer ¶ added in v0.2.6
type CoreTracer struct {
TraceTransporter
}
A core logger must be able to write to blackboard
func NewCoreTracer ¶ added in v0.2.6
func NewCoreTracer(w TraceTransporter) *CoreTracer
type CoreTransporter ¶ added in v0.2.6
type CoreTransporter interface { StreamTransporter WriteTransporter TraceTransporter }
Our recommended functionality for most agent
type CreateAgent ¶ added in v0.2.6
type CreateAgent struct { Runner string // Who should run the agent Name string // What should the agent tell everyone its name is Entrypoint string // What "command" should the runner use to start the agent (this is likely to be specific to the type of runner executing Athe agent) Attributes []byte // Data blob for passing parameters Context []byte // Data blob for passing materials the agent may need to do its job (like a tar file; also likely to be specific to runner) }
type CreateAgentFilter ¶ added in v0.2.6
type CreateAgentFilter struct {
// contains filtered or unexported fields
}
type Filter ¶
Filters and FilterChains process incoming channel messages and surface them based on their specific filter implementation. I think, subconsciously, this ideas is loosely based on firewall filters/rules. NOTE: Might be a better name for this stuff
type FilterChain ¶
type FilterChain struct {
// contains filtered or unexported fields
}
A filter chain provides a convenient way to surface a message accross multiple potential matches. NOTE: FilterChain should probably become an interface sooner rather than later to allow for different implementations of chain behavior.
func NewFilterChain ¶
func NewFilterChain() *FilterChain
NewFilterChain is the constructor for the FilterChain type.
func (*FilterChain) Cleanup ¶
func (f *FilterChain) Cleanup()
Cleanup closes the channel of each filter in the chain.
func (*FilterChain) Insert ¶
func (f *FilterChain) Insert(filt Filter)
Insert wraps append, acting as a set method for the `filters` field.
func (*FilterChain) Len ¶ added in v0.2.6
func (f *FilterChain) Len() int
func (*FilterChain) ListenAndFilter ¶
func (f *FilterChain) ListenAndFilter(ctx context.Context, incoming chan *core.Message)
ListenAndFilter filters an incoming message based on the filters in the chain.
type GoodListener ¶ added in v0.2.6
type GoodListener interface { Listener ListenForCreateAgent() <-chan *CreateAgent ListenForHalt() <-chan struct{} // Should this instead surface the source of the halt rather than just a signal? ListenForHello() <-chan string }
type HTTPReadTransporter ¶ added in v0.2.6
type HTTPReadTransporter struct { *http.Client StartIdx int // contains filtered or unexported fields }
NOTE: the http.Client should be reused across http transporters as much as possible. This will facilitate reusing open/cached TCP state, etc. By design, it must be passed, even if only a zero value (which is a valid client `go doc http.Client`).
func NewHTTPReadTransporter ¶ added in v0.2.6
func NewHTTPReadTransporter(bb_addr string, c *http.Client) *HTTPReadTransporter
func (*HTTPReadTransporter) Len ¶ added in v0.2.6
func (h *HTTPReadTransporter) Len() (int, error)
Get the current length of the blackboard. Efficient agent joining requires this.
func (*HTTPReadTransporter) Read ¶ added in v0.2.6
func (h *HTTPReadTransporter) Read(msg_idx int) (*core.Message, error)
TODO: needs an http client TODO: needs a slicer? There's really no such thg as "reading" is there? Only slicing.
func (*HTTPReadTransporter) Slice ¶ added in v0.2.6
func (h *HTTPReadTransporter) Slice(start_idx, end_idx int) ([]*core.Message, error)
func (*HTTPReadTransporter) Stream ¶ added in v0.2.6
This is a hack. We essentially make it look like we're producing a stream a MessageStack however we're really just polling and writing them into a channel. This sets the stage to move to a model where all messages arrive to clients in a "stream" (producer/consumer)
Stream produces all messages to the agent for the agent to handle filtering
type HTTPStreamTransporter ¶ added in v0.2.6
StreamTransporter provides a stream of messages from the blackboard as served sent events. This is a relatively naive form of streaming.
func NewHTTPStreamTransporter ¶ added in v0.2.6
func NewHTTPStreamTransporter(bb_addr string, c *http.Client) *HTTPStreamTransporter
type HTTPTraceTransporter ¶ added in v0.2.6
func NewHTTPTraceTransporter ¶ added in v0.2.6
func NewHTTPTraceTransporter(bb_addr string, c *http.Client) *HTTPTraceTransporter
type HTTPWriteTransporter ¶ added in v0.2.6
func NewHTTPWriteTransporter ¶ added in v0.2.6
func NewHTTPWriteTransporter(bb_addr string, c *http.Client) *HTTPWriteTransporter
func (*HTTPWriteTransporter) GetName ¶ added in v0.2.6
func (h *HTTPWriteTransporter) GetName() string
func (*HTTPWriteTransporter) SetName ¶ added in v0.2.6
func (h *HTTPWriteTransporter) SetName(name string)
type HaltFilter ¶ added in v0.2.6
type HaltFilter struct {
// contains filtered or unexported fields
}
type HelloFilter ¶ added in v0.2.6
type HelloFilter struct {
// contains filtered or unexported fields
}
type IdentifiableAgent ¶ added in v0.2.6
type Listener ¶ added in v0.2.6
type Listener interface { StartListening(ctx context.Context) error ListenFor(tags []string) <-chan *Post }
These are the minimal agent interfaces we want to specify These describe the general functionality we want our users to directly interact with. I.e. users are intended to be developing at the "agent" level, and core handles the communication/transit level.
the more abstract composed interfaces are intended as semi-official recommendations for functionality that should be implemented for certain desired agent behaviors. Anything prefixed with "core" is our recommended default, which skews permissive and easy-to-use rather than secure. Please keep that in mind for your application.
type Logger ¶ added in v0.2.6
type Logger interface {
Log(sev core.Log_Severity, msg string) error
}
type ManagedAgent ¶
type ManagedAgent interface { CoreAgent Setup() error Loop(ctx context.Context) (continue_loop bool, err error) }
ManagedAgent is an interface enforcing the agent-defined Setup and Loop methods and SetName (should be handled by the executable).
Free the agents: keep this interface as narrow as possible
type Post ¶
type Post struct { Tags []string Metadata []byte Data []byte // contains filtered or unexported fields }
Post is a user-facing "Post" type that abstracts the fields of core.Post
func PostFromPostMessage ¶
PostFromPostMessage converts a core.Post message to the user-facing Post type.
Ditto for this one about the public facing interface and all We need to surface the potential http errors to troubleshoot if object store is unavailable.
func (*Post) Hash ¶
Hash produces an md5-based hash by concatenating the md5 checksum of the metadata, data, and tags of the Post object. At present, this has no relation to the "hash" stdlib.
func (*Post) ReplyIndex ¶ added in v0.2.6
type ReadTransporter ¶ added in v0.2.6
type ReadTransporter interface { Read(msg_idx int) (*core.Message, error) Len() (int, error) Slice(start_idx, end_idx int) ([]*core.Message, error) }
Readers are able to read a single message. Will not be supported by all clients or blackboards. Required for a polling-style agent
type Sourceable ¶ added in v0.2.6
type Sourceable interface {
Source() string
}
type StreamTransporter ¶ added in v0.2.6
A transporter than can provide the blackboard messages via a long-lived connection to the server (or something that mimics it)
type TagFilter ¶
type TagFilter struct { TagSet // contains filtered or unexported fields }
A TagFilter filters messages using the TagSet's Match function. If a message passes the filter, it is written into the associated channel.
type TagSet ¶
type TagSet map[string]struct{}
TagSet is a type alias for map[string]struct{}, which essentially mimics a set of tag string values.
func NewTagSet ¶
NewTagSet generates a TagSet from a slice. This will have the effect of removing any duplicate tags from the incoming slice.
func (TagSet) Matches ¶
In general, we don't care what "Matches" actually means that much, just that it is definedby this function. However, in THIS implementation, it means the receiver tag set is a subset of the incoming tag set. E.g., if I am ListenFor(TagSet{"lung", "segmentation"}), I would return any message that contains *at least* these two tags. Other implementations may have stronger (or weaker) notions of a match.
type TraceTransporter ¶ added in v0.2.6
type WriteTransporter ¶ added in v0.2.6
type WriteTransporter interface { IdentifiableAgent Write(msg *core.Message) error }
Source Files
¶
- agent_context.go
- agent_interfaces.go
- basic_agent.go
- blackboard_transporter.go
- core_agents.go
- core_http_transporter.go
- create_agent.go
- deserialize.go
- filter_chain.go
- http_read_transporter.go
- http_stream_transporter.go
- http_tracing_transporter.go
- http_write_transporter.go
- lifecycle.go
- numpy.go
- post.go
- serialize.go
- sse.go
- tags.go