backtest

package
v0.0.0-...-0765ed5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2025 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const StreamName = "backtest"

Variables

View Source
var Module = fx.Options(
	fx.Provide(
		func(conn *pgxpool.Pool, st storage.Storage, ce container.Engine) (DependecyContainer, error) {
			dc := stream.NewDependencyContainer()
			dc.AddSingleton(stream.DBDep, conn)
			dc.AddSingleton(stream.StorageDep, st)
			dc.AddSingleton(stream.ContainerEngineDep, ce)

			return dc, nil
		},
		func(jt nats.JetStreamContext, conn *pgxpool.Pool, dc DependecyContainer) (Stream, error) {
			s, err := stream.NewNATSStream(jt, StreamName, dc, conn)
			if err != nil {
				return nil, fmt.Errorf("failed to create stream: %w", err)
			}
			return s, nil
		},
	),
	fx.Invoke(
		func(g *grpc.Server, pgx *pgxpool.Pool, s Stream, st storage.Storage) error {
			backtestServer := servicer.NewBacktestServer(pgx, s)
			pb.RegisterBacktestServicerServer(g, backtestServer)
			ingestionServer := servicer.NewIngestionServer(s, st, pgx)
			pb.RegisterIngestionServicerServer(g, ingestionServer)
			return nil
		},
		func(conn *pgxpool.Pool) error {
			return repository.CreateTables(context.TODO(), conn)
		},
		func(lc fx.Lifecycle, backtestStream Stream, containers container.Engine, dependencies DependecyContainer) error {
			var backtestContainer container.Container
			var backtestEngine engine.Engine
			lc.Append(fx.Hook{
				OnStart: func(ctx context.Context) error {
					var err error
					backtestContainer, err = containers.Start(ctx, environment.GetBacktestImage(), "")
					if err != nil {
						return fmt.Errorf("error starting container: %w", err)
					}

					for range 30 {
						health, err := backtestContainer.GetHealth()
						if err != nil {
							return fmt.Errorf("error getting container health: %w", err)
						}
						if health == types.Healthy {
							break
						} else if health == types.Unhealthy {
							return errors.New("container is unhealthy")
						}
						time.Sleep(time.Second / 3)
					}
					backtestEngine, err = backtest.NewZiplineEngine(ctx, backtestContainer, nil)
					if err != nil {
						return fmt.Errorf("error creating zipline engine: %w", err)
					}
					returnEngine := func(ctx context.Context, msg stream.Message) (interface{}, error) {
						return backtestEngine, nil
					}
					dependencies.AddMethod(dependency.GetEngineKey, returnEngine)

					err = backtestStream.CommandSubscriber("ingest", "ingest", command.Ingest)
					if err != nil {
						return fmt.Errorf("error subscribing to backtest.ingest: %w", err)
					}
					err = backtestStream.CommandSubscriber("session", "run", command.SessionRun)
					if err != nil {
						return fmt.Errorf("error subscribing to backtest.start: %w", err)
					}
					err = backtestStream.CommandSubscriber("status", "update", command.UpdateIngestionStatus)
					if err != nil {
						return fmt.Errorf("error subscribing to ingestion.update: %w", err)
					}
					return nil
				},
				OnStop: func(ctx context.Context) error {
					if err := backtestContainer.Stop(); err != nil {
						return fmt.Errorf("error stopping container: %w", err)
					}
					return backtestStream.Unsubscribe()
				},
			})
			return nil
		},
	),
)

Functions

This section is empty.

Types

type DependecyContainer

type DependecyContainer stream.DependencyContainer

type Stream

type Stream stream.Stream

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳