Monitoring Distributed Streams Presented by: Tsachi Sharfman Instructor: Assoc. Prof. Assaf Schuster Co-Instructor: Assoc. Prof. Daniel Keren, Haifa U. 02/02/20 Israel Innovation Summit 1 Problem Definition A set of distributed data streams A data vector is collected from each stream Mirrored web site Distributed spam filtering system A sensor network Stream is infinite

Moving/jumping windows Given: A function over the average of the data vectors Given: A predetermined threshold Question: did the function value cross the threshold? 02/02/20 2 Example 1: Web Page Frequency Counts Mirrored web site Each mirror maintains the frequency each page was accessed in last 5 min. We would like to constantly maintain a list of the most frequently accessed web pages (as defined by a threshold) 02/02/20 3 Air Quality Monitoring Example 2:

Sensors monitoring the concentration of air pollutants. Each sensor holds a data vector comprising of the measured concentration of various pollutants (CO2, SO2, O3, etc.). A function on the average data vector determines the Air Quality Index (AQI) Alert in case the AQI exceeds a given threshold. 02/02/20 4 Variance Alert Example 3: Sensors monitoring the temperature in a server room (machine room, conference room, etc.) Ensure uniform temp.: monitor variance of readings Alert in case variance exceeds a threshold

Temperature readings by n sensors x1, , xn Each sensor holds a data vector vi = (xi2, xi )T 1 n n 1 n 2 n x The average data vector is v = 1 Var(all sensors) = n 02/02/20 n

i 1 2 1 xi n i i 1 n i 1 xi i 1 xi T 2 5 Pervious Work

Focused on linear functions (e.g., sum, average): M. Dilman and D. Raz. Efficient reactive monitoring. In INFOCOM, pages 10121019, 2001. Pervious solutions for arbitrary Functions included only Nave Algorithms All data is moved to a central place Communication overhead CPU overhead Power overhead Privacy issues 02/02/20 6 Frequency Count Algorithm Each mirror maintains a frequency count for

each web page One of the mirrors is designated as a coordinator. Synchronization: upon initialization, and from time to time, as dictated by the algorithm: Coordinator collects frequency counts from all mirrors Calculates global frequency, called the estimate value and denoted by e Sends estimate value to all mirrors 02/02/20 7 Solution (continued) Per web page, each mirror maintains: Reference value ,v, holds the frequency count

collected by coordinator during last Synch. Event Estimate value, e, last estimate value sent by coordinator Drift value, u = e + v v Observation: the average of the drift values equals the average of the frequency counts Conclusion: As long as all the drift values are on same side of threshold as the estimate value, no communication is required 02/02/20 8 Example 4 (running example): A distributed spam mail filtering system. A mail server receives a stream of positive and negative examples. Information Gain vs. Document Index 0.008 0.007 bosnia 0.006 ipo Information Gain

Distributed Feature Selection febru 0.005 0.004 0.003 0.002 0.001 0 0 200000 400000 Document Index 600000 800000 Select a set of features (words) to be used in order to build a spam classifier. A feature is good if its information gain is above a threshold. 02/02/20

9 Distributed Calculation of Information Gain Each server maintains a contingency table for each feature. We would like to determine, for each feature, whether the information gain on the average contingency table is above the threshold. Spa m Ci,j = f ^ f 02/02/20 ^Spa m 0.1 0.2 0.2 0.5 ci, j

IG (Ci , j ) ci , j log (ci,1 ci,2 )(c1, j c2, j ) i{1,2} j{1,2} 10 Distributed Calculation of Information Gain continued Note that the information gain on the average contingency table can not be derived from the information gain on each individual contingency table! 0.5 0 C1 = 0 0.5 0 0.5 C2 = 0.5 02/02/20 0

IG(C1)=1 IG(C2)=1 C1 C2 IG 0 2 11 Novel Geometric Approach Each node hold a statistics vector Coloring the vector space Grey:: function > threshold White:: function <= threshold Goal: determine color of global data vector (average). Observation: average is in the convex hull of drift vectors If convex hull monochromatic then average is same color

How do we know convex hull is monochromatic? Without global/central knowledge 02/02/20 12 Distributively Bounding the Convex Hull A reference point is known to all nodes Each node constructs a ball Theorem: convex hull is bound by the union of balls 02/02/20 13 Basic Algorithm

Monitored function and threshold induce a coloring over the vector space An initial estimate vector is calculated Nodes check color of drift sphere Drift vector is the diameter of the drift ball If any ball non monochromatic synchronize nodes 02/02/20 14 Reuters Corpus (RCV1v2) 800,000+ news stories Aug 20 1996 -- Aug 19 1997 Corporate/Industrial tagging simulates spam Information Gain vs. Document Index Broadcast Messages vs. Threshold 0.008

0.007 ipo 0.006 Information Gain febru 0.005 0.004 0.003 0.002 0.001 0 800 Broadcast Messages (x1000) bosnia 700 bosnia 600 ipo 500 febru 400 Naive Alg.

300 200 100 0 0 02/02/20 200000 400000 Document Index 600000 800000 0 0.001 0.002 0.003 Threshold 0.004 0.005 0.006

n=10 15 Trade-off: Accuracy vs. Performance Inefficiency: value of function on average is close to the threshold Performance can be enhanced at the cost of less accurate result: Set error margin around the threshold value Broadcast Messages vs. Error Margin 250 Broadcast Messages (x1000) bosnia ipo 200 febru 150 100 50

0 02/02/20 0% 10% 20% 30% Error Margin 40% 50% 16 Scalability Broadcast Messages vs. Number of (0.006) Broadcast Messages vs. Number of Nodes (0.003) 800 bonsia 700 Broadcast Messages (x1000) Broadcast Messages (x1000) 800 ipo

600 febru 500 naive 400 300 200 100 0 0 25 50 Number of Nodes 75 100 bonsia 700 ipo 600 febru

500 naive 400 300 200 100 0 0 25 50 Num ber of Nodes 75 100 # messages per node is constant. 02/02/20 17 Balancing Globally calculating average is costly Often possible to average only some

of the data vectors. 02/02/20 18 Performance Analysis -10 02/02/20 10 10 5 5 0 0 -5 0 5 10 -10 -5

0 -5 -5 -10 -10 5 10 19 Performance Analysis (continued) Exp. Global E[v ] Stats. 10 Distance Sphere Exp. Global Stats. Vector E[v ] Local Stats. Vector

vi 10 5 5 Distance Sphere B( E[v (t )], vi (t )) dist ( E[v ], f , r ) -10 02/02/20 -5 5 10 -10 -5 5 -5 -5 -10

-10 10 20 Upper Bounds on Probability of Constraint Violation d V [ X ] i Prviolation Prviolation 02/02/20 i 1 N ( Dglobal ) exp 2 D 2 global 2 2

V[Xi ] i 1 /d N d 21 Tiered Sensor Networks Network comprised of two types of sensors, MacroNodes and Motes Motes: Simple, inexpensive sensing units Macro Nodes: Basedresource

on 8-bit constrained Less processors Based on 32-bit processors. Support more advanced OS and development tools 02/02/20 22 Monitoring Sensor Networks (1) A spanning tree is constructed over the connectivity graph Initial measurement vector aggregated over the tree, and flooded to all Motes Motes use aggregated vector as estimate vector An attempt is made to balance constraint violations within the cluster (intra cluster balancing): Cluster Head iteratively selects motes and requests their drift vectors

Balancing succeeds if the average of the drift vectors collected from motes creates a monochromatic ball with the estimate vector 02/02/20 23 Monitoring Sensor Networks (2) In case intra cluster balancing failed, an attempt is made to balance the constraint violation by passing a token among the Cluster Heads (extra cluster balancing): The token consists of the average of the drift vectors held by the motes in the clusters the token has visited Upon receipt of token, the Cluster Head collects drift vectors from motes, and adds them to the token In case extra cluster balancing has failed, the vector held by the token is flooded to the motes, which use it as the new estimate vector 02/02/20 24 Monitoring Sensor

Networks (3) Token traversal implemented as a DFS search Several tokens may simultaneously traverse the network, in which case they may be required to merge 02/02/20 25 Data Set A 144x36 data points of temperature readings in the northern hemisphere Readings are taken every 6h for a period of a year Strong Spatial and Temporal correlation among data readings Average temperature ranges from 3.15 to 15 degrees Centigrade 02/02/20 26

Experimental Results Threshold Mote Messages vs. Threshold Cluster Head Messages vs. Threshold 8000 8000 Decent. Alg. Naive Alg. 6000 5000 4000 3000 2000 1000 0 -30 -20 02/02/20 -10 0 10 20 Threshold ( C) 30

40 7000 Cluster Head Msg. (x1000) Mote Messages (x1000) 7000 50 Decent. Alg. 6000 Naive Alg. 5000 4000 3000 2000 1000 0 -30 -20 -10 0 10 20 Threshold ( C)

30 40 27 50 Experimental Results Error Margin Mote Messages vs. Error Margin Cluster Head Messages vs. Error Margin 1800 300 Cluster Head Msgs. (x1000) Mote Messages(x1000) 1600 1400 1200 1000 800 600 400 200 250 200 150 100 50

0 0 0 1 02/02/20 2 3 4 Error Margin ( C) 5 6 7 0 1 2 3 4 Error Margin ( C) 5

6 7 28 Experimental Results Cluster Size Mote Messages vs. Cluster Size Cluster Head Messages vs. Cluster Size 2000 9000 8000 Cluster Head Messages Mote Messages (x1000) 1800 1600 1400 1200 1000 800 600 400 7000 6000 5000 4000 3000

2000 1000 200 0 0 0 1000 02/02/20 2000 3000 Cluster Size 4000 5000 0 1000 2000 3000 Cluster Size 4000 5000

29 Future Work Monitoring Top-k items Additional applications Large scale networks 02/02/20 30 Questions? 02/02/20 31 Bounding Theorem Proof (1) 02/02/20 32 Bounding Theorem Proof (2) 02/02/20 33

Bounding Theorem Proof (3) A C B 02/02/20 34 Bounding Theorem Proof (4) A C 02/02/20 B 35 Bounding Theorem Proof (5) A C B 02/02/20 36 Window Size Broadcast Messages vs. Window Size

Broadcast Messages (x1000) 250 bosnia ipo 200 febru 150 100 50 0 6700 02/02/20 8040 9380 10720 Window size 12060 13400 37 Simultaneous Features Broadcast Messages vs. Simultaneous Features Broadcast Messages (x1000) 300 250

200 150 100 50 Decnt. Alg. Naive Alg. 0 0 02/02/20 1000 2000 3000 4000 Num. of Monitored Features 5000 38