1#include "data_request.hpp"
2#include "flow_control.hpp"
4#include "transmit_rate_control.hpp"
5#include <vanetza/access/data_request.hpp>
6#include <vanetza/access/interface.hpp>
7#include <vanetza/common/runtime.hpp>
16 m_runtime(runtime), m_trc(trc), m_access(ifc), m_queue_length(0)
20FlowControl::~FlowControl()
30 if (transmit_immediately(transmission)) {
31 m_trc.
notify(transmission);
32 transmit(
request, std::move(packet));
34 enqueue(
request, std::move(packet));
38void FlowControl::trigger()
41 auto transmission = dequeue();
43 m_trc.
notify(*transmission);
44 transmit(transmission->request, std::move(transmission->packet));
47 PendingTransmission* next = next_transmission();
49 schedule_trigger(*next);
53void FlowControl::schedule_trigger(
const Transmission& tx)
55 auto callback_delay = m_trc.
delay(tx);
56 m_runtime.
schedule(callback_delay, std::bind(&FlowControl::trigger,
this),
this);
59void FlowControl::enqueue(
const DataRequest& request, std::unique_ptr<ChunkPacket> packet)
61 const bool first_packet = empty();
62 const auto ac = map_profile_onto_ac(
request.dcc_profile);
63 auto expiry = m_runtime.
now() +
request.lifetime;
64 while (m_queue_length > 0 && m_queues[ac].size() >= m_queue_length) {
65 m_queues[ac].pop_front();
66 m_packet_drop_hook(ac, packet.get());
68 m_queues[ac].emplace_back(expiry,
request, std::move(packet));
71 schedule_trigger(m_queues[ac].back());
75boost::optional<FlowControl::PendingTransmission> FlowControl::dequeue()
77 boost::optional<PendingTransmission> transmission;
78 Queue* queue = next_queue();
80 transmission = std::move(queue->front());
87bool FlowControl::transmit_immediately(
const Transmission& transmission)
const
89 const auto ac = map_profile_onto_ac(transmission.profile());
92 bool contention =
false;
93 for (
auto it = m_queues.cbegin(); it != m_queues.end(); ++it) {
94 if (it->first >= ac && !it->second.empty()) {
100 return !contention && m_trc.
delay(transmission) == Clock::duration::zero();
103bool FlowControl::empty()
const
105 return std::all_of(m_queues.cbegin(), m_queues.cend(),
106 [](
const std::pair<access::AccessCategory, const Queue&>& kv) {
107 return kv.second.empty();
111FlowControl::Queue* FlowControl::next_queue()
113 Queue* next =
nullptr;
114 Clock::duration min_delay = Clock::duration::max();
116 for (
auto& kv : m_queues) {
117 Queue& queue = kv.second;
118 if (!queue.empty()) {
119 const auto delay = m_trc.
delay(queue.front());
120 if (delay < min_delay) {
129FlowControl::PendingTransmission* FlowControl::next_transmission()
131 Queue* queue = next_queue();
132 return queue ? &queue->front() :
nullptr;
135void FlowControl::drop_expired()
137 for (
auto& kv : m_queues) {
138 access::AccessCategory ac = kv.first;
139 Queue& queue = kv.second;
140 queue.remove_if([
this, ac](
const PendingTransmission& transmission) {
141 bool drop = transmission.expiry < m_runtime.
now();
143 m_packet_drop_hook(ac, transmission.packet.get());
150void FlowControl::transmit(
const DataRequest& request, std::unique_ptr<ChunkPacket> packet)
152 access::DataRequest access_request;
153 access_request.source_addr =
request.source;
154 access_request.destination_addr =
request.destination;
155 access_request.ether_type =
request.ether_type;
156 access_request.access_category = map_profile_onto_ac(
request.dcc_profile);
158 m_packet_transmit_hook(access_request.access_category, packet.get());
159 m_access.request(access_request, std::move(packet));
164 m_packet_drop_hook = std::move(cb);
169 m_packet_transmit_hook = std::move(cb);
174 m_queue_length = length;
182 schedule_trigger(*next);
virtual void cancel(const void *scope)=0
virtual Clock::time_point now() const =0
virtual void schedule(Clock::time_point tp, const Callback &cb, const void *scope=nullptr)=0
FlowControl(Runtime &, TransmitRateControl &, access::Interface &)
void set_packet_drop_hook(PacketDropHook::callback_type &&)
void queue_length(std::size_t length)
void request(const DataRequest &, std::unique_ptr< ChunkPacket >) override
void set_packet_transmit_hook(PacketTransmitHook::callback_type &&)
virtual void notify(const Transmission &tx)=0
virtual Clock::duration delay(const Transmission &tx)=0