8888 0 ,
8989 "Only import after the specified timestamp" ,
9090 )
91+ positionFile = flag .String (
92+ "position-file" ,
93+ "" ,
94+ "file to store position and load position from" ,
95+ )
9196 verbose = flag .Bool (
9297 "verbose" ,
9398 false ,
@@ -114,19 +119,28 @@ func main() {
114119 panic (fmt .Sprintf ("Error when parsing schemas file: %q" , err ))
115120 }
116121
122+ var pos * posTracker
123+ if len (* positionFile ) > 0 {
124+ pos , err = NewPositionTracker (* positionFile )
125+ if err != nil {
126+ log .Fatalf ("Error instantiating position tracker: %s" , err )
127+ }
128+ defer pos .Close ()
129+ }
130+
117131 fileChan := make (chan string )
118132
119133 wg := & sync.WaitGroup {}
120134 wg .Add (* threads )
121135 for i := 0 ; i < * threads ; i ++ {
122- go processFromChan (fileChan , wg )
136+ go processFromChan (pos , fileChan , wg )
123137 }
124138
125- getFileListIntoChan (fileChan )
139+ getFileListIntoChan (pos , fileChan )
126140 wg .Wait ()
127141}
128142
129- func processFromChan (files chan string , wg * sync.WaitGroup ) {
143+ func processFromChan (pos * posTracker , files chan string , wg * sync.WaitGroup ) {
130144 tr := & http.Transport {
131145 TLSClientConfig : & tls.Config {InsecureSkipVerify : * insecureSSL },
132146 }
@@ -194,6 +208,9 @@ func processFromChan(files chan string, wg *sync.WaitGroup) {
194208 resp .Body .Close ()
195209 }
196210
211+ if pos != nil {
212+ pos .Done (file )
213+ }
197214 processed := atomic .AddUint32 (& processedCount , 1 )
198215 if processed % 100 == 0 {
199216 skipped := atomic .LoadUint32 (& skippedCount )
@@ -362,7 +379,7 @@ func encodedChunksFromPoints(points []whisper.Point, intervalIn, chunkSpan uint3
362379}
363380
364381// scan a directory and feed the list of whisper files relative to base into the given channel
365- func getFileListIntoChan (fileChan chan string ) {
382+ func getFileListIntoChan (pos * posTracker , fileChan chan string ) {
366383 filepath .Walk (
367384 * whisperDirectory ,
368385 func (path string , info os.FileInfo , err error ) error {
@@ -375,9 +392,15 @@ func getFileListIntoChan(fileChan chan string) {
375392 atomic .AddUint32 (& skippedCount , 1 )
376393 return nil
377394 }
378- if len (path ) >= 4 && path [len (path )- 4 :] = = ".wsp" {
379- fileChan <- path
395+ if len (path ) < 4 || path [len (path )- 4 :] ! = ".wsp" {
396+ return nil
380397 }
398+ if pos != nil && pos .IsDone (path ) {
399+ log .Debugf ("Skipping file %s because it was listed as already done" , path )
400+ return nil
401+ }
402+
403+ fileChan <- path
381404 return nil
382405 },
383406 )
0 commit comments