feed.gno

3.65 Kb ยท 159 lines
  1package static
  2
  3import (
  4	"bufio"
  5	"bytes"
  6	"errors"
  7
  8	"gno.land/p/demo/gnorkle/feed"
  9	"gno.land/p/demo/gnorkle/gnorkle"
 10	"gno.land/p/demo/gnorkle/ingesters/single"
 11	"gno.land/p/demo/gnorkle/message"
 12	"gno.land/p/demo/gnorkle/storage/simple"
 13	"gno.land/p/demo/ufmt"
 14)
 15
 16// Feed is a static feed.
 17type Feed struct {
 18	id            string
 19	isLocked      bool
 20	valueDataType string
 21	ingester      gnorkle.Ingester
 22	storage       gnorkle.Storage
 23	tasks         []feed.Task
 24}
 25
 26// NewFeed creates a new static feed.
 27func NewFeed(
 28	id string,
 29	valueDataType string,
 30	ingester gnorkle.Ingester,
 31	storage gnorkle.Storage,
 32	tasks ...feed.Task,
 33) *Feed {
 34	return &Feed{
 35		id:            id,
 36		valueDataType: valueDataType,
 37		ingester:      ingester,
 38		storage:       storage,
 39		tasks:         tasks,
 40	}
 41}
 42
 43// NewSingleValueFeed is a convenience function  for creating a static feed
 44// that autocommits a value after a single ingestion.
 45func NewSingleValueFeed(
 46	id string,
 47	valueDataType string,
 48	tasks ...feed.Task,
 49) *Feed {
 50	return NewFeed(
 51		id,
 52		valueDataType,
 53		&single.ValueIngester{},
 54		simple.NewStorage(1),
 55		tasks...,
 56	)
 57}
 58
 59// ID returns the feed's ID.
 60func (f Feed) ID() string {
 61	return f.id
 62}
 63
 64// Type returns the feed's type.
 65func (f Feed) Type() feed.Type {
 66	return feed.TypeStatic
 67}
 68
 69// Ingest ingests a message into the feed. It either adds the value to the ingester's
 70// pending values or commits the value to the storage.
 71func (f *Feed) Ingest(funcType message.FuncType, msg, providerAddress string) error {
 72	if f == nil {
 73		return feed.ErrUndefined
 74	}
 75
 76	if f.isLocked {
 77		return errors.New("feed locked")
 78	}
 79
 80	switch funcType {
 81	case message.FuncTypeIngest:
 82		// Autocommit the ingester's value if it's a single value ingester
 83		// because this is a static feed and this is the only value it will ever have.
 84		if canAutoCommit, err := f.ingester.Ingest(msg, providerAddress); canAutoCommit && err == nil {
 85			if err := f.ingester.CommitValue(f.storage, providerAddress); err != nil {
 86				return err
 87			}
 88
 89			f.isLocked = true
 90		} else if err != nil {
 91			return err
 92		}
 93
 94	case message.FuncTypeCommit:
 95		if err := f.ingester.CommitValue(f.storage, providerAddress); err != nil {
 96			return err
 97		}
 98
 99		f.isLocked = true
100
101	default:
102		return errors.New("invalid message function " + string(funcType))
103	}
104
105	return nil
106}
107
108// Value returns the feed's latest value, it's data type, and whether or not it can
109// be safely consumed. In this case it uses `f.isLocked` because, this being a static
110// feed, it will only ever have one value; once that value is committed the feed is locked
111// and there is a valid, non-empty value to consume.
112func (f Feed) Value() (feed.Value, string, bool) {
113	return f.storage.GetLatest(), f.valueDataType, f.isLocked
114}
115
116// MarshalJSON marshals the components of the feed that are needed for
117// an agent to execute tasks and send values for ingestion.
118func (f Feed) MarshalJSON() ([]byte, error) {
119	buf := new(bytes.Buffer)
120	w := bufio.NewWriter(buf)
121
122	w.Write([]byte(
123		`{"id":"` + f.id +
124			`","type":"` + ufmt.Sprintf("%d", int(f.Type())) +
125			`","value_type":"` + f.valueDataType +
126			`","tasks":[`),
127	)
128
129	first := true
130	for _, task := range f.tasks {
131		if !first {
132			w.WriteString(",")
133		}
134
135		taskJSON, err := task.MarshalJSON()
136		if err != nil {
137			return nil, err
138		}
139
140		w.Write(taskJSON)
141		first = false
142	}
143
144	w.Write([]byte("]}"))
145	w.Flush()
146
147	return buf.Bytes(), nil
148}
149
150// Tasks returns the feed's tasks. This allows task consumers to extract task
151// contents without having to marshal the entire feed.
152func (f Feed) Tasks() []feed.Task {
153	return f.tasks
154}
155
156// IsActive returns true if the feed is accepting ingestion requests from agents.
157func (f Feed) IsActive() bool {
158	return !f.isLocked
159}