Diff
Not logged in

Differences From Artifact [acae3c9d72]:

To Artifact [96151c2200]:


11
12
13
14
15
16
17
18
19
20
21
22
23
24

25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
..
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
..
86
87
88
89
90
91
92


93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111

112
113
114
115
116
117
118
119
...
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
...
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.
//
// Please review the Licences for the specific language governing permissions and limitations
// relating to use of the SAFE Network Software.

use common::{Core, CoreTimer, Message, Priority, Socket, SocketAddr, State};
use main::{ConnectionId, ConnectionMap, Event, PeerId};
use mio::{Poll, Ready, Token};
use mio::timer::Timeout;
use std::any::Any;
use std::cell::RefCell;
use std::collections::hash_map::Entry;

use std::rc::Rc;
use std::time::Duration;

#[cfg(not(test))]
pub const INACTIVITY_TIMEOUT_MS: u64 = 120_000;
#[cfg(not(test))]
const HEARTBEAT_PERIOD_MS: u64 = 20_000;

#[cfg(test)]
pub const INACTIVITY_TIMEOUT_MS: u64 = 900;
#[cfg(test)]
const HEARTBEAT_PERIOD_MS: u64 = 300;

pub struct ActiveConnection {
    token: Token,
    socket: Socket,
    cm: ConnectionMap,
    our_id: PeerId,
    their_id: PeerId,
    event_tx: ::CrustEventSender,
    heartbeat: Heartbeat,
}

impl ActiveConnection {
    pub fn start(core: &mut Core,
                 poll: &Poll,
                 token: Token,
                 socket: Socket,
                 cm: ConnectionMap,
                 our_id: PeerId,
                 their_id: PeerId,
                 event: Event,
                 event_tx: ::CrustEventSender) {
        trace!("Entered state ActiveConnection: {:?} -> {:?}",
               our_id,
               their_id);

        let heartbeat = match Heartbeat::new(core, token) {
            Ok(heartbeat) => heartbeat,
            Err(e) => {
................................................................................
                debug!("{:?} - Failed to initialize heartbeat: {:?} - killing ActiveConnection \
                        to {:?}",
                       our_id,
                       e,
                       their_id);
                let _ = poll.deregister(&socket);
                let _ = event_tx.send(Event::LostPeer(their_id));
                // TODO See if this plays well with ConnectionMap manipulation below
                return;
            }
        };

        let state = Rc::new(RefCell::new(ActiveConnection {
                                             token: token,
                                             socket: socket,
................................................................................

        let _ = core.insert_state(token, state.clone());

        let mut state_mut = state.borrow_mut();
        {
            let mut guard = unwrap!(state_mut.cm.lock());
            {


                let conn_id = guard.entry(their_id).or_insert(ConnectionId {
                                                                  active_connection: None,
                                                                  currently_handshaking: 1,
                                                              });
                conn_id.currently_handshaking -= 1;
                conn_id.active_connection = Some(token);
            }
            trace!("Connection Map inserted: {:?} -> {:?}",
                   their_id,
                   guard.get(&their_id));
        }
        let _ = state_mut.event_tx.send(event);
        state_mut.read(core, poll);
    }

    fn read(&mut self, core: &mut Core, poll: &Poll) {
        loop {
            match self.socket.read::<Message>() {
                Ok(Some(Message::Data(data))) => {

                    let _ = self.event_tx.send(Event::NewMessage(self.their_id, data));
                    self.reset_receive_heartbeat(core, poll);
                }
                Ok(Some(Message::Heartbeat)) => {
                    self.reset_receive_heartbeat(core, poll);
                }
                Ok(Some(message)) => {
                    debug!("{:?} - Unexpected message: {:?}", self.our_id, message);
................................................................................
        }
    }

    #[cfg(not(test))]
    /// Helper function that returns a socket address of the connection
    pub fn peer_addr(&self) -> ::Res<SocketAddr> {
        use main::CrustError;
        self.socket
            .peer_addr()
            .map(SocketAddr)
            .map_err(CrustError::Common)
    }

    #[cfg(test)]
    // TODO(nbaksalyar) find a better way to mock connection IPs
    pub fn peer_addr(&self) -> ::Res<SocketAddr> {
        use std::str::FromStr;
        Ok(SocketAddr(unwrap!(FromStr::from_str("192.168.0.1:0"))))
    }

    fn write(&mut self, core: &mut Core, poll: &Poll, msg: Option<(Message, Priority)>) {
        if let Err(e) = self.socket.write(poll, self.token, msg) {
            debug!("{:?} - Failed to write socket: {:?}", self.our_id, e);
            self.terminate(core, poll);
        }
    }

    fn reset_receive_heartbeat(&mut self, core: &mut Core, poll: &Poll) {
................................................................................
        if let Err(e) = self.heartbeat.reset_send(core) {
            debug!("{:?} - Failed to reset heartbeat: {:?}", self.our_id, e);
            self.terminate(core, poll);
        }
    }
}

impl State for ActiveConnection {
    fn ready(&mut self, core: &mut Core, poll: &Poll, kind: Ready) {
        if kind.is_error() || kind.is_hup() {
            trace!("{:?} Terminating connection to peer: {:?}. \
                    Event reason: {:?} - Optional Error: {:?}",
                   self.our_id,
                   self.their_id,
                   kind,







|
|





>













|


|
|
|
|



|




|
|
|
|
|







 







|







 







>
>
|
|
|
|













|

>
|







 







<
<
<
|






|


|







 







|







11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
..
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
..
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
...
132
133
134
135
136
137
138



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
...
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.
//
// Please review the Licences for the specific language governing permissions and limitations
// relating to use of the SAFE Network Software.

use common::{Core, CoreTimer, Message, Priority, Socket, State, Uid};
use main::{ConnectionId, ConnectionMap, Event};
use mio::{Poll, Ready, Token};
use mio::timer::Timeout;
use std::any::Any;
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::net::SocketAddr;
use std::rc::Rc;
use std::time::Duration;

#[cfg(not(test))]
pub const INACTIVITY_TIMEOUT_MS: u64 = 120_000;
#[cfg(not(test))]
const HEARTBEAT_PERIOD_MS: u64 = 20_000;

#[cfg(test)]
pub const INACTIVITY_TIMEOUT_MS: u64 = 900;
#[cfg(test)]
const HEARTBEAT_PERIOD_MS: u64 = 300;

pub struct ActiveConnection<UID: Uid> {
    token: Token,
    socket: Socket,
    cm: ConnectionMap<UID>,
    our_id: UID,
    their_id: UID,
    event_tx: ::CrustEventSender<UID>,
    heartbeat: Heartbeat,
}

impl<UID: Uid> ActiveConnection<UID> {
    pub fn start(core: &mut Core,
                 poll: &Poll,
                 token: Token,
                 socket: Socket,
                 cm: ConnectionMap<UID>,
                 our_id: UID,
                 their_id: UID,
                 event: Event<UID>,
                 event_tx: ::CrustEventSender<UID>) {
        trace!("Entered state ActiveConnection: {:?} -> {:?}",
               our_id,
               their_id);

        let heartbeat = match Heartbeat::new(core, token) {
            Ok(heartbeat) => heartbeat,
            Err(e) => {
................................................................................
                debug!("{:?} - Failed to initialize heartbeat: {:?} - killing ActiveConnection \
                        to {:?}",
                       our_id,
                       e,
                       their_id);
                let _ = poll.deregister(&socket);
                let _ = event_tx.send(Event::LostPeer(their_id));
                // TODO See if this plays well with ConnectionMap<UID> manipulation below
                return;
            }
        };

        let state = Rc::new(RefCell::new(ActiveConnection {
                                             token: token,
                                             socket: socket,
................................................................................

        let _ = core.insert_state(token, state.clone());

        let mut state_mut = state.borrow_mut();
        {
            let mut guard = unwrap!(state_mut.cm.lock());
            {
                let conn_id = guard
                    .entry(their_id)
                    .or_insert(ConnectionId {
                                   active_connection: None,
                                   currently_handshaking: 1,
                               });
                conn_id.currently_handshaking -= 1;
                conn_id.active_connection = Some(token);
            }
            trace!("Connection Map inserted: {:?} -> {:?}",
                   their_id,
                   guard.get(&their_id));
        }
        let _ = state_mut.event_tx.send(event);
        state_mut.read(core, poll);
    }

    fn read(&mut self, core: &mut Core, poll: &Poll) {
        loop {
            match self.socket.read::<Message<UID>>() {
                Ok(Some(Message::Data(data))) => {
                    let _ = self.event_tx
                        .send(Event::NewMessage(self.their_id, data));
                    self.reset_receive_heartbeat(core, poll);
                }
                Ok(Some(Message::Heartbeat)) => {
                    self.reset_receive_heartbeat(core, poll);
                }
                Ok(Some(message)) => {
                    debug!("{:?} - Unexpected message: {:?}", self.our_id, message);
................................................................................
        }
    }

    #[cfg(not(test))]
    /// Helper function that returns a socket address of the connection
    pub fn peer_addr(&self) -> ::Res<SocketAddr> {
        use main::CrustError;



        self.socket.peer_addr().map_err(CrustError::Common)
    }

    #[cfg(test)]
    // TODO(nbaksalyar) find a better way to mock connection IPs
    pub fn peer_addr(&self) -> ::Res<SocketAddr> {
        use std::str::FromStr;
        Ok(unwrap!(FromStr::from_str("192.168.0.1:0")))
    }

    fn write(&mut self, core: &mut Core, poll: &Poll, msg: Option<(Message<UID>, Priority)>) {
        if let Err(e) = self.socket.write(poll, self.token, msg) {
            debug!("{:?} - Failed to write socket: {:?}", self.our_id, e);
            self.terminate(core, poll);
        }
    }

    fn reset_receive_heartbeat(&mut self, core: &mut Core, poll: &Poll) {
................................................................................
        if let Err(e) = self.heartbeat.reset_send(core) {
            debug!("{:?} - Failed to reset heartbeat: {:?}", self.our_id, e);
            self.terminate(core, poll);
        }
    }
}

impl<UID: Uid> State for ActiveConnection<UID> {
    fn ready(&mut self, core: &mut Core, poll: &Poll, kind: Ready) {
        if kind.is_error() || kind.is_hup() {
            trace!("{:?} Terminating connection to peer: {:?}. \
                    Event reason: {:?} - Optional Error: {:?}",
                   self.our_id,
                   self.their_id,
                   kind,