feed.gno
3.65 Kb ยท 159 lines
1package static
2
3import (
4 "bufio"
5 "bytes"
6 "errors"
7
8 "gno.land/p/demo/gnorkle/feed"
9 "gno.land/p/demo/gnorkle/gnorkle"
10 "gno.land/p/demo/gnorkle/ingesters/single"
11 "gno.land/p/demo/gnorkle/message"
12 "gno.land/p/demo/gnorkle/storage/simple"
13 "gno.land/p/demo/ufmt"
14)
15
16// Feed is a static feed.
17type Feed struct {
18 id string
19 isLocked bool
20 valueDataType string
21 ingester gnorkle.Ingester
22 storage gnorkle.Storage
23 tasks []feed.Task
24}
25
26// NewFeed creates a new static feed.
27func NewFeed(
28 id string,
29 valueDataType string,
30 ingester gnorkle.Ingester,
31 storage gnorkle.Storage,
32 tasks ...feed.Task,
33) *Feed {
34 return &Feed{
35 id: id,
36 valueDataType: valueDataType,
37 ingester: ingester,
38 storage: storage,
39 tasks: tasks,
40 }
41}
42
43// NewSingleValueFeed is a convenience function for creating a static feed
44// that autocommits a value after a single ingestion.
45func NewSingleValueFeed(
46 id string,
47 valueDataType string,
48 tasks ...feed.Task,
49) *Feed {
50 return NewFeed(
51 id,
52 valueDataType,
53 &single.ValueIngester{},
54 simple.NewStorage(1),
55 tasks...,
56 )
57}
58
59// ID returns the feed's ID.
60func (f Feed) ID() string {
61 return f.id
62}
63
64// Type returns the feed's type.
65func (f Feed) Type() feed.Type {
66 return feed.TypeStatic
67}
68
69// Ingest ingests a message into the feed. It either adds the value to the ingester's
70// pending values or commits the value to the storage.
71func (f *Feed) Ingest(funcType message.FuncType, msg, providerAddress string) error {
72 if f == nil {
73 return feed.ErrUndefined
74 }
75
76 if f.isLocked {
77 return errors.New("feed locked")
78 }
79
80 switch funcType {
81 case message.FuncTypeIngest:
82 // Autocommit the ingester's value if it's a single value ingester
83 // because this is a static feed and this is the only value it will ever have.
84 if canAutoCommit, err := f.ingester.Ingest(msg, providerAddress); canAutoCommit && err == nil {
85 if err := f.ingester.CommitValue(f.storage, providerAddress); err != nil {
86 return err
87 }
88
89 f.isLocked = true
90 } else if err != nil {
91 return err
92 }
93
94 case message.FuncTypeCommit:
95 if err := f.ingester.CommitValue(f.storage, providerAddress); err != nil {
96 return err
97 }
98
99 f.isLocked = true
100
101 default:
102 return errors.New("invalid message function " + string(funcType))
103 }
104
105 return nil
106}
107
108// Value returns the feed's latest value, it's data type, and whether or not it can
109// be safely consumed. In this case it uses `f.isLocked` because, this being a static
110// feed, it will only ever have one value; once that value is committed the feed is locked
111// and there is a valid, non-empty value to consume.
112func (f Feed) Value() (feed.Value, string, bool) {
113 return f.storage.GetLatest(), f.valueDataType, f.isLocked
114}
115
116// MarshalJSON marshals the components of the feed that are needed for
117// an agent to execute tasks and send values for ingestion.
118func (f Feed) MarshalJSON() ([]byte, error) {
119 buf := new(bytes.Buffer)
120 w := bufio.NewWriter(buf)
121
122 w.Write([]byte(
123 `{"id":"` + f.id +
124 `","type":"` + ufmt.Sprintf("%d", int(f.Type())) +
125 `","value_type":"` + f.valueDataType +
126 `","tasks":[`),
127 )
128
129 first := true
130 for _, task := range f.tasks {
131 if !first {
132 w.WriteString(",")
133 }
134
135 taskJSON, err := task.MarshalJSON()
136 if err != nil {
137 return nil, err
138 }
139
140 w.Write(taskJSON)
141 first = false
142 }
143
144 w.Write([]byte("]}"))
145 w.Flush()
146
147 return buf.Bytes(), nil
148}
149
150// Tasks returns the feed's tasks. This allows task consumers to extract task
151// contents without having to marshal the entire feed.
152func (f Feed) Tasks() []feed.Task {
153 return f.tasks
154}
155
156// IsActive returns true if the feed is accepting ingestion requests from agents.
157func (f Feed) IsActive() bool {
158 return !f.isLocked
159}