gusucode.com > target工具箱matlab源码程序 > target/codertarget/rtos/targetservices/ASIOSender.cpp

    /* Copyright 2013-2016 The MathWorks, Inc. */
#include "ASIOSender.hpp"
#include <boost/foreach.hpp>
#include <boost/iterator/transform_iterator.hpp>
#include <boost/chrono.hpp>

namespace coder { namespace tgtsvc {

/** Functor to create a boost::asio::const_buffer from a Message*. */
struct MessageToBuffer {
    boost::asio::const_buffer operator()(const Message *msg) const {
        return boost::asio::const_buffer(msg->transmitStart(), msg->transmitSize());
    }
};

/**
 * Implementation of boost::asio::ConstBufferSequence for gather writes. 
 * T must be a sequence of Message*
 */
template <typename T>
struct ConstBufferSequence {
    typedef boost::asio::const_buffer value_type;
    typedef boost::transform_iterator<MessageToBuffer, typename T::const_iterator, boost::asio::const_buffer> const_iterator;

    ConstBufferSequence(T &l) : list_(l) {}

    /// clears the list and deletes all the messages in it
    void clearAndDelete() {
        while (!list_.empty()) {
            delete(list_.front());
            list_.pop_front();
        }
    }

    const_iterator begin() const { return const_iterator(list_.begin()); }
    const_iterator end() const { return const_iterator(list_.end()); }

    T &list_;
};

template <typename AsyncWriteStream>
ASIOSender<AsyncWriteStream>::ASIOSender(AsyncWriteStream &s, ErrorCallback ec) : socket_(s), 
    errorCallback_(ec), toSend_(QUEUE_SIZE, NULL), beingSent_(QUEUE_SIZE, NULL), status_(TSE_SUCCESS),
    waitingSenders_(0), inProgress_(false)
{
    toSend_.clear();
    beingSent_.clear();
}

template <typename AsyncWriteStream>
ASIOSender<AsyncWriteStream>::~ASIOSender()
{
    boost::unique_lock<boost::mutex> lock(mtx_);
    status_ = TSE_ERROR;
    while (waitingSenders_ > 0) {
        lock.unlock();
        full_.notify_all();
        boost::this_thread::sleep(boost::posix_time::milliseconds(20));
        lock.lock();
    }
    clearQueues();
}

template <typename AsyncWriteStream>
TSEStatus ASIOSender<AsyncWriteStream>::enqueue(Message *msg, Message::Priority p)
{
    boost::unique_lock<boost::mutex> lock(mtx_);
    while (toSend_.size() >= QUEUE_SIZE && status_ == TSE_SUCCESS) {
        ++waitingSenders_;

        try {
            full_.wait(lock);
        } 
        
        // don't send if the wait throws an error
        catch (...) {
            --waitingSenders_;
            return TSE_RESOURCE_UNAVAILABLE;
        }

        --waitingSenders_;
    }

    if (status_ == TSE_SUCCESS) {
        toSend_.push_back(msg);
        if (!inProgress_) {
            startSending();
        }
        return TSE_SUCCESS;
    } else {
        return TSE_ERROR;
    }
}

template <typename AsyncWriteStream>
bool ASIOSender<AsyncWriteStream>::pending()
{
    boost::unique_lock<boost::mutex> lock(mtx_);
    return (toSend_.size() != 0 || inProgress_ == true);
}

/**
 * Starts another asynchronous send if there is data available.
 * Required preconditions:
 *  mtx_ must be locked
 *  inProgress_ must be false
 *  beingSent_ must be empty
 */
template <typename AsyncWriteStream>
void ASIOSender<AsyncWriteStream>::startSending()
{
    assert(inProgress_ == false);
    if (toSend_.empty()) return;

    assert(beingSent_.empty());
    beingSent_.swap(toSend_);
    full_.notify_all();

    ConstBufferSequence<MessageQueue> buffSeq(beingSent_);
    inProgress_ = true;
    boost::asio::async_write(socket_,
        buffSeq,
        boost::bind(&ASIOSender::sendComplete, this,
        boost::asio::placeholders::error));
}

template <typename AsyncWriteStream>
void ASIOSender<AsyncWriteStream>::sendComplete(const boost::system::error_code &error)
{
    BOOST_FOREACH(Message *m, beingSent_) {
        delete m;
    }
    beingSent_.clear();

    boost::lock_guard<boost::mutex> guard(mtx_);
    inProgress_ = false;
    if (error) {
        status_ = TSE_ERROR;
        clearQueues();
        errorCallback_(error);
    } else {
        startSending();
    }
}

template <typename AsyncWriteStream>
void ASIOSender<AsyncWriteStream>::clearQueues()
{
    BOOST_FOREACH(Message *m, beingSent_) {
        delete m;
    }
    beingSent_.clear();

    BOOST_FOREACH(Message *m, toSend_) {
        delete m;
    }
    toSend_.clear();
}

template class ASIOSender<boost::asio::ip::tcp::socket>;
template class ASIOSender<boost::asio::serial_port>;

}}