@@ -79,7 +79,6 @@ namespace graphchi {
7979 /* *
8080 * Concurrency control
8181 */
82- mutex modification_lock;
8382 mutex schedulerlock;
8483 mutex shardlock;
8584
@@ -205,6 +204,7 @@ namespace graphchi {
205204 * Initialize streaming shards in the start of each iteration.
206205 */
207206 virtual void initialize_sliding_shards () {
207+ state = " initialize-shards" ;
208208 shardlock.lock ();
209209 if (this ->sliding_shards .empty ()) {
210210 for (int p=0 ; p < this ->nshards ; p++) {
@@ -283,7 +283,7 @@ namespace graphchi {
283283 usleep (1000000 ); // Sleep 1 sec
284284 return false ;
285285 }
286- modification_lock.lock ();
286+ this -> modification_lock .lock ();
287287 added_edges++;
288288 int shard = get_shard_for (dst);
289289 int srcshard = get_shard_for (src);
@@ -306,15 +306,15 @@ namespace graphchi {
306306
307307 // Add edge to buffers
308308 new_edge_buffers[shard][srcshard]->add (src, dst, edata);
309- modification_lock.unlock ();
309+ this -> modification_lock .unlock ();
310310 return true ;
311311 }
312312
313313 void add_task (vid_t vid) {
314314 if (this ->scheduler != NULL ) {
315- modification_lock.lock ();
315+ this -> modification_lock .lock ();
316316 this ->scheduler ->add_task (vid);
317- modification_lock.unlock ();
317+ this -> modification_lock .unlock ();
318318 }
319319 }
320320
@@ -442,23 +442,25 @@ namespace graphchi {
442442 }
443443
444444
445- virtual void load_before_updates (std::vector<svertex_t > &vertices) {
445+ virtual void load_before_updates (std::vector<svertex_t > &vertices) {
446+ state = " load-edges" ;
447+
446448 this ->base_engine ::load_before_updates (vertices);
447449
448450#ifdef SUPPORT_DELETIONS
449451 for (unsigned int i=0 ; i < (unsigned int )vertices.size (); i++) {
450452 deletecounts[this ->exec_interval ] += vertices[i].deleted_inc ;
451453 }
452454 #endif
455+
456+ state = " execute-updates" ;
453457 }
454458
455459
456460 virtual void init_vertices (std::vector<svertex_t > &vertices,
457461 graphchi_edge<EdgeDataType> * &edata) {
458- modification_lock.lock ();
459462 base_engine::init_vertices (vertices, edata);
460463 incorporate_buffered_edges (this ->exec_interval , this ->sub_interval_st , this ->sub_interval_en , vertices);
461- modification_lock.unlock ();
462464 }
463465
464466
@@ -539,7 +541,7 @@ namespace graphchi {
539541 state = " commit-ingests" ;
540542 vid_t maxwindow = 4000000 ; // FIXME: HARDCODE
541543 size_t mem_budget = this ->membudget_mb * 1024 * 1024 ;
542- modification_lock.lock ();
544+ this -> modification_lock .lock ();
543545
544546 // Clean up sliding shards
545547 // NOTE: there is a problem since this will waste
@@ -858,7 +860,7 @@ namespace graphchi {
858860 shardlock.unlock ();
859861 }
860862 init_buffers ();
861- modification_lock.unlock ();
863+ this -> modification_lock .unlock ();
862864 }
863865 template <typename T>
864866 void bwrite (int f, char * buf, char * &bufptr, T val) {
0 commit comments