controller

package
v0.0.0-...-7abe34b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2025 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const DEFAULT_SUBGRAPH_POLL_SECS = 4 * time.Second
View Source
const FOLLOWUP_WINDOW = 5 // Followup refreshes are grouped to be sent at this interval
View Source
const HTTP_MAX_ATTEMPTS = 5

If startup loader encounters an error the indexer will have to continue syncing from the last loaded block, which might be slow. Better to retry a few times before giving up.

View Source
const MAX_BLOCK = 999999999
View Source
const MAX_REQS_PER_SEC = 1000
View Source
const NUM_PARALLEL_WORKERS = 200 // Should be higher than multicall_max_batch for the given chain
View Source
const N_MAX_RETRIES = 500

Should be high because temporary RPC issues should not cause a crash, especially since running without RPC data isn't that bad these days.

View Source
const N_POSITIONS_REFRESH_ON_SWAP = 100
View Source
const RECENT_EVENT_WINDOW = 60

Historical events won't require followup, because an RPC call should * be sync'd

View Source
const REFRESH_CYCLE_TIME = 30 * 60
View Source
const RETRY_QUERY_MAX_WAIT = 60
View Source
const RETRY_QUERY_MIN_WAIT = 10
View Source
const SKIPPABLE_REFRESH_INTERVAL = 30 // Skippable requests (rewards) will only be refreshed this often per position
View Source
const SLOW_QUEUE_SIZE = 1200000 // On Scroll about 700000 is needed for the startup refresh.
View Source
const SUBGRAPH_SYNC_DELAY = 1

Used because subgraph synchronization is not observed to be non-atomic between meta latest time and updating individual tables. Gives the subraph indexer time to index the incremental rows

View Source
const URGENT_QUEUE_SIZE = 50000

Variables

This section is empty.

Functions

func LoadStartupCache

func LoadStartupCache(startupCacheSource string, syncer SubgraphSyncer)

func NewStartupCacheProvider

func NewStartupCacheProvider(cacheSource string, chainId types.ChainId) startupCacheProvider

Types

type BumpRefreshHandle

type BumpRefreshHandle struct {
	// contains filtered or unexported fields
}

func (*BumpRefreshHandle) Hash

func (p *BumpRefreshHandle) Hash(buf *bytes.Buffer) [32]byte

func (*BumpRefreshHandle) LabelTag

func (p *BumpRefreshHandle) LabelTag() string

func (*BumpRefreshHandle) RefreshQuery

func (p *BumpRefreshHandle) RefreshQuery(query *loader.ICrocQuery)

func (*BumpRefreshHandle) RefreshTime

func (p *BumpRefreshHandle) RefreshTime() int64

func (*BumpRefreshHandle) Skippable

func (p *BumpRefreshHandle) Skippable() bool

type CombinedSubgraphSyncer

type CombinedSubgraphSyncer struct {
	// contains filtered or unexported fields
}

func NewCombinedSubgraphSyncer

func NewCombinedSubgraphSyncer(controller *Controller, chainConfig loader.ChainConfig, network types.NetworkName, startupCacheDir string, startupCache string) *CombinedSubgraphSyncer

func NewCombinedSubgraphSyncerAtStart

func NewCombinedSubgraphSyncerAtStart(controller *Controller, chainConfig loader.ChainConfig, network types.NetworkName, startBlocks loader.SubgraphStartBlocks, startupCache string) *CombinedSubgraphSyncer

func (*CombinedSubgraphSyncer) ChainId

func (s *CombinedSubgraphSyncer) ChainId() types.ChainId

func (*CombinedSubgraphSyncer) IngestEntries

func (s *CombinedSubgraphSyncer) IngestEntries(table string, entriesData []byte, startBlock, endBlock int) (int, bool, error)

func (*CombinedSubgraphSyncer) PollSubgraphUpdates

func (s *CombinedSubgraphSyncer) PollSubgraphUpdates()

func (*CombinedSubgraphSyncer) SetStartBlocks

func (s *CombinedSubgraphSyncer) SetStartBlocks(startBlocks loader.SubgraphStartBlocks)

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

func New

func NewOnQuery

func NewOnQuery(netCfg loader.NetworkConfig, cache *cache.MemoryCache, query loader.ICrocQuery) *Controller

func (*Controller) OnNetwork

func (c *Controller) OnNetwork(network types.NetworkName) *ControllerOverNetwork

func (*Controller) SpinUntilLiqSync

func (c *Controller) SpinUntilLiqSync()

func (*Controller) StartupSubgraphSyncDone

func (c *Controller) StartupSubgraphSyncDone()

Since subgraph is loaded in chrolonogical order, the refresher receives old positions first - it will be refreshing long removed positions for like 20 minutes, while newer positions will be stuck with no APR visible in the UI. So on startup it's better to pause the refresher, unpause it after subgraph fully loaded, and then refresh all positions in a reverse chronological order. Just a few seconds of running the refresher will be enough to refresh the most recent positions, leading to a better experience overall despite the delay before starting refreshes.

type ControllerOverNetwork

type ControllerOverNetwork struct {
	// contains filtered or unexported fields
}

func (*ControllerOverNetwork) FlushSyncCycle

func (c *ControllerOverNetwork) FlushSyncCycle(time int)

Called to indicate that all tables have completed the most recent sync cycle up * to the checkpointed time.

func (*ControllerOverNetwork) IngestAggEvent

func (c *ControllerOverNetwork) IngestAggEvent(r tables.AggEvent)

func (*ControllerOverNetwork) IngestBalance

func (c *ControllerOverNetwork) IngestBalance(b tables.Balance)

func (*ControllerOverNetwork) IngestFee

func (c *ControllerOverNetwork) IngestFee(l tables.FeeChange)

func (*ControllerOverNetwork) IngestKnockout

func (c *ControllerOverNetwork) IngestKnockout(l tables.LiqChange)

func (*ControllerOverNetwork) IngestLiqChange

func (c *ControllerOverNetwork) IngestLiqChange(l tables.LiqChange)

func (*ControllerOverNetwork) IngestSwap

func (c *ControllerOverNetwork) IngestSwap(l tables.Swap)

type IMsgType

type IMsgType interface {
	// contains filtered or unexported methods
}

type IRefreshHandle

type IRefreshHandle interface {
	Hash(buf *bytes.Buffer) [32]byte
	RefreshTime() int64
	Skippable() bool
	RefreshQuery(query *loader.ICrocQuery)
	LabelTag() string
}

type KnockoutAliveHandle

type KnockoutAliveHandle struct {
	// contains filtered or unexported fields
}

func (*KnockoutAliveHandle) Hash

func (p *KnockoutAliveHandle) Hash(buf *bytes.Buffer) [32]byte

func (*KnockoutAliveHandle) LabelTag

func (p *KnockoutAliveHandle) LabelTag() string

func (*KnockoutAliveHandle) RefreshQuery

func (p *KnockoutAliveHandle) RefreshQuery(query *loader.ICrocQuery)

func (*KnockoutAliveHandle) RefreshTime

func (p *KnockoutAliveHandle) RefreshTime() int64

func (*KnockoutAliveHandle) Skippable

func (p *KnockoutAliveHandle) Skippable() bool

type KnockoutPostHandle

type KnockoutPostHandle struct {
	// contains filtered or unexported fields
}

func (*KnockoutPostHandle) Hash

func (p *KnockoutPostHandle) Hash(buf *bytes.Buffer) [32]byte

func (*KnockoutPostHandle) LabelTag

func (p *KnockoutPostHandle) LabelTag() string

func (*KnockoutPostHandle) RefreshQuery

func (p *KnockoutPostHandle) RefreshQuery(query *loader.ICrocQuery)

func (*KnockoutPostHandle) RefreshTime

func (p *KnockoutPostHandle) RefreshTime() int64

func (*KnockoutPostHandle) Skippable

func (p *KnockoutPostHandle) Skippable() bool

type LiquidityRefresher

type LiquidityRefresher struct {
	// contains filtered or unexported fields
}

func NewLiquidityRefresher

func NewLiquidityRefresher(query *loader.ICrocQuery) *LiquidityRefresher

func (*LiquidityRefresher) PushRefresh

func (lr *LiquidityRefresher) PushRefresh(hndl IRefreshHandle, eventTime int)

func (*LiquidityRefresher) PushRefreshPoll

func (lr *LiquidityRefresher) PushRefreshPoll(hndl IRefreshHandle)

func (*LiquidityRefresher) SetPause

func (lr *LiquidityRefresher) SetPause(pause bool)

type NormalSubgraphSyncer

type NormalSubgraphSyncer struct {
	// contains filtered or unexported fields
}

func NewSubgraphSyncer

func NewSubgraphSyncer(controller *Controller, chainConfig loader.ChainConfig, network types.NetworkName, startupCache string) *NormalSubgraphSyncer

func NewSubgraphSyncerAtStart

func NewSubgraphSyncerAtStart(controller *Controller, chainConfig loader.ChainConfig, network types.NetworkName, startBlocks loader.SubgraphStartBlocks, startupCache string) *NormalSubgraphSyncer

func (*NormalSubgraphSyncer) ChainId

func (s *NormalSubgraphSyncer) ChainId() types.ChainId

func (*NormalSubgraphSyncer) IngestEntries

func (s *NormalSubgraphSyncer) IngestEntries(table string, entriesData []byte, startBlock, endBlock int) (int, bool, error)

func (*NormalSubgraphSyncer) PollSubgraphUpdates

func (s *NormalSubgraphSyncer) PollSubgraphUpdates()

func (*NormalSubgraphSyncer) SetStartBlocks

func (s *NormalSubgraphSyncer) SetStartBlocks(startBlocks loader.SubgraphStartBlocks)

type PoolInitPriceHandle

type PoolInitPriceHandle struct {
	Pool  types.PoolLocation
	Block int
	Hist  *model.PoolTradingHistory
}

func (*PoolInitPriceHandle) Hash

func (p *PoolInitPriceHandle) Hash(buf *bytes.Buffer) [32]byte

func (*PoolInitPriceHandle) LabelTag

func (p *PoolInitPriceHandle) LabelTag() string

func (*PoolInitPriceHandle) RefreshQuery

func (p *PoolInitPriceHandle) RefreshQuery(query *loader.ICrocQuery)

func (*PoolInitPriceHandle) RefreshTime

func (p *PoolInitPriceHandle) RefreshTime() int64

func (*PoolInitPriceHandle) Skippable

func (p *PoolInitPriceHandle) Skippable() bool

type PositionRefreshHandle

type PositionRefreshHandle struct {
	// contains filtered or unexported fields
}

func (*PositionRefreshHandle) Hash

func (p *PositionRefreshHandle) Hash(buf *bytes.Buffer) [32]byte

func (*PositionRefreshHandle) LabelTag

func (p *PositionRefreshHandle) LabelTag() string

func (*PositionRefreshHandle) RefreshQuery

func (p *PositionRefreshHandle) RefreshQuery(query *loader.ICrocQuery)

func (*PositionRefreshHandle) RefreshTime

func (p *PositionRefreshHandle) RefreshTime() int64

func (*PositionRefreshHandle) Skippable

func (p *PositionRefreshHandle) Skippable() bool

type RewardsRefreshHandle

type RewardsRefreshHandle struct {
	// contains filtered or unexported fields
}

func (*RewardsRefreshHandle) Hash

func (p *RewardsRefreshHandle) Hash(buf *bytes.Buffer) [32]byte

func (*RewardsRefreshHandle) LabelTag

func (p *RewardsRefreshHandle) LabelTag() string

func (*RewardsRefreshHandle) RefreshQuery

func (p *RewardsRefreshHandle) RefreshQuery(query *loader.ICrocQuery)

func (*RewardsRefreshHandle) RefreshTime

func (p *RewardsRefreshHandle) RefreshTime() int64

func (*RewardsRefreshHandle) Skippable

func (p *RewardsRefreshHandle) Skippable() bool

type SubgraphSyncer

type SubgraphSyncer interface {
	ChainId() types.ChainId
	PollSubgraphUpdates()
	SetStartBlocks(startBlocks loader.SubgraphStartBlocks)
	IngestEntries(table string, entriesData []byte, startBlock, endBlock int) (lastObs int, hasMore bool, err error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳