Merge pull request #814 from grafana/importer_recover_from_position · pythonAI/metrictank@29bd120 · GitHub
Skip to content

Commit 29bd120

Browse files
authored
Merge pull request grafana-cold-storage#814 from grafana/importer_recover_from_position
keep import position in whisper reader
2 parents 7ff0845 + 762197d commit 29bd120

4 files changed

Lines changed: 169 additions & 6 deletions

File tree

cmd/mt-whisper-importer-reader/main.go

Lines changed: 29 additions & 6 deletions
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"io"
7+
"os"
8+
"sync"
9+
)
10+
11+
type posTracker struct {
12+
sync.Mutex
13+
file string
14+
fd *os.File
15+
completedMap sync.Map
16+
wg sync.WaitGroup
17+
}
18+
19+
func NewPositionTracker(file string) (*posTracker, error) {
20+
p := &posTracker{file: file}
21+
22+
fd, err := os.Open(file)
23+
if err != nil {
24+
if !os.IsNotExist(err) {
25+
return nil, err
26+
}
27+
} else {
28+
reader := bufio.NewReader(fd)
29+
var path string
30+
for {
31+
line, isPrefix, err := reader.ReadLine()
32+
if err != nil {
33+
if err == io.EOF {
34+
break
35+
}
36+
return nil, err
37+
}
38+
39+
path += string(line)
40+
if isPrefix {
41+
continue
42+
} else {
43+
p.completedMap.Store(path, struct{}{})
44+
path = ""
45+
}
46+
}
47+
48+
fd.Close()
49+
}
50+
51+
p.fd, err = os.OpenFile(file, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
52+
if err != nil {
53+
return nil, err
54+
}
55+
56+
return p, nil
57+
}
58+
59+
func (p *posTracker) IsDone(path string) bool {
60+
_, ok := p.completedMap.Load(path)
61+
return ok
62+
}
63+
64+
func (p *posTracker) Done(path string) {
65+
p.completedMap.Store(path, struct{}{})
66+
p.wg.Add(1)
67+
go func() {
68+
p.Lock()
69+
defer p.Unlock()
70+
p.fd.WriteString(fmt.Sprintf("%s\n", path))
71+
p.fd.Sync()
72+
p.wg.Done()
73+
}()
74+
}
75+
76+
func (p *posTracker) Close() {
77+
p.wg.Wait()
78+
p.fd.Close()
79+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package main
2+
3+
import (
4+
"os"
5+
"testing"
6+
)
7+
8+
func TestPositionTracker(t *testing.T) {
9+
filePath := "/tmp/positionTrackerTest"
10+
clearFile := func() { os.Remove(filePath) }
11+
clearFile()
12+
defer clearFile()
13+
14+
testValue1 := "file1"
15+
testValue2 := "file2"
16+
testValue3 := "file3"
17+
18+
p1, err := NewPositionTracker(filePath)
19+
if err != nil {
20+
t.Fatalf("Error instantiating position tracker: %s", err)
21+
}
22+
p1.Done(testValue1)
23+
if !p1.IsDone(testValue1) {
24+
t.Fatalf("Expected %s to be done, but it was not", testValue1)
25+
}
26+
if p1.IsDone(testValue2) {
27+
t.Fatalf("Expected %s to not be done, but it was", testValue2)
28+
}
29+
p1.Done(testValue2)
30+
if !p1.IsDone(testValue2) {
31+
t.Fatalf("Expected %s to be done, but it was not", testValue2)
32+
}
33+
p1.Close()
34+
35+
// read the file into new instance of position tracker
36+
p2, err := NewPositionTracker(filePath)
37+
if err != nil {
38+
t.Fatalf("Error instantiating position tracker: %s", err)
39+
}
40+
if !p2.IsDone(testValue1) || !p2.IsDone(testValue2) {
41+
t.Fatalf("Expected %s and %s to be done, but it was not", testValue1, testValue2)
42+
}
43+
44+
if p2.IsDone(testValue3) {
45+
t.Fatalf("Expected %s to not be done, but it was", testValue3)
46+
}
47+
48+
p2.Done(testValue3)
49+
p2.Close()
50+
51+
// read the file into new instance of position tracker
52+
p3, err := NewPositionTracker(filePath)
53+
if err != nil {
54+
t.Fatalf("Error instantiating position tracker: %s", err)
55+
}
56+
if !p3.IsDone(testValue1) || !p3.IsDone(testValue2) || !p3.IsDone(testValue3) {
57+
t.Fatalf("Expected %s, %s and %s to be done, but it was not", testValue1, testValue2, testValue3)
58+
}
59+
}

docs/tools.md

Lines changed: 2 additions & 0 deletions

0 commit comments

Comments
 (0)