Diff
Not logged in

Differences From Artifact [9b54616ee1]:

To Artifact [f71605f3f8]:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
..
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
..
82
83
84
85
86
87
88


89
90
91
92
93
94
95
96
...
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
...
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
...
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
...
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Copyright 2015 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under (1) the MaidSafe.net Commercial License,
// version 1.0 or later, or (2) The General Public License (GPL), version 3, depending on which
// licence you accepted on initial access to the Software (the "Licences").
//
// By contributing code to the SAFE Network Software, or to this project generally, you agree to be
// bound by the terms of the MaidSafe Contributor Agreement, version 1.0.  This, along with the
// Licenses can be found in the root directory of this project at LICENSE, COPYING and CONTRIBUTOR.
//
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.
//
// Please review the Licences for the specific language governing permissions and limitations
// relating to use of the SAFE Network Software.


use core::client::message_queue::MessageQueue;
use core::errors::CoreError;
use core::translated_events::ResponseEvent;
use maidsafe_utilities;
use routing::{Data, DataIdentifier, XorName};
use std::sync::{Arc, Mutex};
................................................................................
    /// network as informed by MessageQueue. This is blocking.
    pub fn get(&self) -> Result<Data, CoreError> {
        if let Some((ref sender, ref data_receiver)) = self.data_channel {
            let sender = sender.clone();
            let (tx, rx) = mpsc::channel();
            let _thread_canceller = ThreadCanceller(tx);
            wait_canceller(sender, rx);
            match try!(data_receiver.recv()) {
                ResponseEvent::GetResp(result) => {
                    let data = try!(result);
                    if let DataIdentifier::Immutable(..) = self.requested_id {
                        let mut msg_queue = unwrap!(self.message_queue.lock());
                        msg_queue.local_cache_insert(self.requested_name, data.clone());
                    }

                    Ok(data)
                }
................................................................................
        }
    }

    /// Extract associated sender. This will help cancel the blocking wait at will if so desired.
    /// All that is needed is to extract the sender before doing a `get()` and then while blocking
    /// on `get()` fire `sender.send(ResponseEvent::Terminated)` to gracefully exit the receiver.
    pub fn get_sender(&self) -> Option<&Sender<ResponseEvent>> {


        self.data_channel.as_ref().and_then(|&(ref sender, _)| Some(sender))
    }
}

/// `GetAccountInfoResponseGetter` is a lazy evaluated response getter for `GetAccountInfo`
/// Requests. It will wait for the `MessageQueue` to notify it of the incoming response from the
/// network.
pub struct GetAccountInfoResponseGetter {
................................................................................
    pub fn get(&self) -> Result<(u64, u64), CoreError> {
        let (ref sender, ref data_receiver) = self.data_channel;
        let sender = sender.clone();
        let (tx, rx) = mpsc::channel();
        let _thread_canceller = ThreadCanceller(tx);
        wait_canceller(sender, rx);
        let res = data_receiver.recv();
        match try!(res) {
            ResponseEvent::GetAccountInfoResp(result) => result,
            ResponseEvent::Terminated => Err(CoreError::RequestTimeout),
            _ => Err(CoreError::ReceivedUnexpectedData),
        }
    }

    /// Extract associated sender. This will help cancel the blocking wait at will if so desired.
................................................................................
    /// All that is needed is to extract the sender before doing a `get()` and then while blocking
    /// on `get()` fire `sender.send(ResponseEvent::Terminated)` to gracefully exit the receiver.
    pub fn get_sender(&self) -> &Sender<ResponseEvent> {
        &self.data_channel.0
    }
}

/// MutationResponseGetter is a lazy evaluated response getter for mutating network requests such
/// as PUT/POST/DELETE. It will fetch either from local cache or wait for the MessageQueue to notify
/// it of the incoming response from the network
pub struct MutationResponseGetter {
    data_channel: (Sender<ResponseEvent>, Receiver<ResponseEvent>),
}

impl MutationResponseGetter {
    /// Create a new instance of MutationResponseGetter
    pub fn new(data_channel: (Sender<ResponseEvent>, Receiver<ResponseEvent>))
................................................................................
    /// Get response when it comes from the network as informed by MessageQueue. This is blocking
    pub fn get(&self) -> Result<(), CoreError> {
        let (ref sender, ref data_receiver) = self.data_channel;
        let sender = sender.clone();
        let (tx, rx) = mpsc::channel();
        let _thread_canceller = ThreadCanceller(tx);
        wait_canceller(sender, rx);
        match try!(data_receiver.recv()) {
            ResponseEvent::MutationResp(result) => result,
            ResponseEvent::Terminated => Err(CoreError::RequestTimeout),
            _ => Err(CoreError::ReceivedUnexpectedData),
        }
    }

    /// Extract associated sender. This will help cancel the blocking wait at will if so desired.
................................................................................
    pub fn get_sender(&self) -> &Sender<ResponseEvent> {
        &self.data_channel.0
    }
}

fn wait_canceller(tx: Sender<ResponseEvent>, rx: Receiver<()>) {
    maidsafe_utilities::thread::named("DetachedCanceller", move || {
            const SLEEP_PER_ITER: u64 = 1;
            for _ in 0..REQ_TIMEOUT_SECS {
                if let Ok(()) = rx.try_recv() {
                    return;
                }
                thread::sleep(Duration::from_secs(SLEEP_PER_ITER));
            }
            debug!("Response has timed out - firing wait canceller.");
            let _ = tx.send(ResponseEvent::Terminated);
        })
        .detach();
}

struct ThreadCanceller(Sender<()>);
impl Drop for ThreadCanceller {
    fn drop(&mut self) {
        let _ = self.0.send(());
    }
}







|
|







<







 







|

|







 







>
>
|







 







|







 







|
|
|







 







|







 







|
|
|


|
|
|
|
|
|








1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

17
18
19
20
21
22
23
..
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
..
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
...
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
...
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
...
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
...
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// Copyright 2015 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under (1) the MaidSafe.net Commercial License,
// version 1.0 or later, or (2) The General Public License (GPL), version 3, depending on which
// licence you accepted on initial access to the Software (the "Licences").
//
// By contributing code to the SAFE Network Software, or to this project generally, you agree to be
// bound by the terms of the MaidSafe Contributor Agreement.  This, along with the Licenses can be
// found in the root directory of this project at LICENSE, COPYING and CONTRIBUTOR.
//
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.
//
// Please review the Licences for the specific language governing permissions and limitations
// relating to use of the SAFE Network Software.


use core::client::message_queue::MessageQueue;
use core::errors::CoreError;
use core::translated_events::ResponseEvent;
use maidsafe_utilities;
use routing::{Data, DataIdentifier, XorName};
use std::sync::{Arc, Mutex};
................................................................................
    /// network as informed by MessageQueue. This is blocking.
    pub fn get(&self) -> Result<Data, CoreError> {
        if let Some((ref sender, ref data_receiver)) = self.data_channel {
            let sender = sender.clone();
            let (tx, rx) = mpsc::channel();
            let _thread_canceller = ThreadCanceller(tx);
            wait_canceller(sender, rx);
            match data_receiver.recv()? {
                ResponseEvent::GetResp(result) => {
                    let data = result?;
                    if let DataIdentifier::Immutable(..) = self.requested_id {
                        let mut msg_queue = unwrap!(self.message_queue.lock());
                        msg_queue.local_cache_insert(self.requested_name, data.clone());
                    }

                    Ok(data)
                }
................................................................................
        }
    }

    /// Extract associated sender. This will help cancel the blocking wait at will if so desired.
    /// All that is needed is to extract the sender before doing a `get()` and then while blocking
    /// on `get()` fire `sender.send(ResponseEvent::Terminated)` to gracefully exit the receiver.
    pub fn get_sender(&self) -> Option<&Sender<ResponseEvent>> {
        self.data_channel
            .as_ref()
            .and_then(|&(ref sender, _)| Some(sender))
    }
}

/// `GetAccountInfoResponseGetter` is a lazy evaluated response getter for `GetAccountInfo`
/// Requests. It will wait for the `MessageQueue` to notify it of the incoming response from the
/// network.
pub struct GetAccountInfoResponseGetter {
................................................................................
    pub fn get(&self) -> Result<(u64, u64), CoreError> {
        let (ref sender, ref data_receiver) = self.data_channel;
        let sender = sender.clone();
        let (tx, rx) = mpsc::channel();
        let _thread_canceller = ThreadCanceller(tx);
        wait_canceller(sender, rx);
        let res = data_receiver.recv();
        match res? {
            ResponseEvent::GetAccountInfoResp(result) => result,
            ResponseEvent::Terminated => Err(CoreError::RequestTimeout),
            _ => Err(CoreError::ReceivedUnexpectedData),
        }
    }

    /// Extract associated sender. This will help cancel the blocking wait at will if so desired.
................................................................................
    /// All that is needed is to extract the sender before doing a `get()` and then while blocking
    /// on `get()` fire `sender.send(ResponseEvent::Terminated)` to gracefully exit the receiver.
    pub fn get_sender(&self) -> &Sender<ResponseEvent> {
        &self.data_channel.0
    }
}

/// `MutationResponseGetter` is a lazy evaluated response getter for mutating network requests such
/// as PUT/POST/DELETE. It will fetch either from local cache or wait for the `MessageQueue` to
/// notify it of the incoming response from the network
pub struct MutationResponseGetter {
    data_channel: (Sender<ResponseEvent>, Receiver<ResponseEvent>),
}

impl MutationResponseGetter {
    /// Create a new instance of MutationResponseGetter
    pub fn new(data_channel: (Sender<ResponseEvent>, Receiver<ResponseEvent>))
................................................................................
    /// Get response when it comes from the network as informed by MessageQueue. This is blocking
    pub fn get(&self) -> Result<(), CoreError> {
        let (ref sender, ref data_receiver) = self.data_channel;
        let sender = sender.clone();
        let (tx, rx) = mpsc::channel();
        let _thread_canceller = ThreadCanceller(tx);
        wait_canceller(sender, rx);
        match data_receiver.recv()? {
            ResponseEvent::MutationResp(result) => result,
            ResponseEvent::Terminated => Err(CoreError::RequestTimeout),
            _ => Err(CoreError::ReceivedUnexpectedData),
        }
    }

    /// Extract associated sender. This will help cancel the blocking wait at will if so desired.
................................................................................
    pub fn get_sender(&self) -> &Sender<ResponseEvent> {
        &self.data_channel.0
    }
}

fn wait_canceller(tx: Sender<ResponseEvent>, rx: Receiver<()>) {
    maidsafe_utilities::thread::named("DetachedCanceller", move || {
        const SLEEP_PER_ITER: u64 = 1;
        for _ in 0..REQ_TIMEOUT_SECS {
            if let Ok(()) = rx.try_recv() {
                    return;
                }
            thread::sleep(Duration::from_secs(SLEEP_PER_ITER));
        }
        debug!("Response has timed out - firing wait canceller.");
        let _ = tx.send(ResponseEvent::Terminated);
    })
            .detach();
}

struct ThreadCanceller(Sender<()>);
impl Drop for ThreadCanceller {
    fn drop(&mut self) {
        let _ = self.0.send(());
    }
}