Vanetza
 
Loading...
Searching...
No Matches
flow_control.cpp
1#include "data_request.hpp"
2#include "flow_control.hpp"
3#include "mapping.hpp"
4#include "transmit_rate_control.hpp"
5#include <vanetza/access/data_request.hpp>
6#include <vanetza/access/interface.hpp>
7#include <vanetza/common/runtime.hpp>
8#include <algorithm>
9
10namespace vanetza
11{
12namespace dcc
13{
14
16 m_runtime(runtime), m_trc(trc), m_access(ifc), m_queue_length(0)
17{
18}
19
20FlowControl::~FlowControl()
21{
22 m_runtime.cancel(this);
23}
24
25void FlowControl::request(const DataRequest& request, std::unique_ptr<ChunkPacket> packet)
26{
27 drop_expired();
28
29 const TransmissionLite transmission { request.dcc_profile, packet->size() };
30 if (transmit_immediately(transmission)) {
31 m_trc.notify(transmission);
32 transmit(request, std::move(packet));
33 } else {
34 enqueue(request, std::move(packet));
35 }
36}
37
38void FlowControl::trigger()
39{
40 drop_expired();
41 auto transmission = dequeue();
42 if (transmission) {
43 m_trc.notify(*transmission);
44 transmit(transmission->request, std::move(transmission->packet));
45 }
46
47 PendingTransmission* next = next_transmission();
48 if (next) {
49 schedule_trigger(*next);
50 }
51}
52
53void FlowControl::schedule_trigger(const Transmission& tx)
54{
55 auto callback_delay = m_trc.delay(tx);
56 m_runtime.schedule(callback_delay, std::bind(&FlowControl::trigger, this), this);
57}
58
59void FlowControl::enqueue(const DataRequest& request, std::unique_ptr<ChunkPacket> packet)
60{
61 const bool first_packet = empty();
62 const auto ac = map_profile_onto_ac(request.dcc_profile);
63 auto expiry = m_runtime.now() + request.lifetime;
64 while (m_queue_length > 0 && m_queues[ac].size() >= m_queue_length) {
65 m_queues[ac].pop_front();
66 m_packet_drop_hook(ac, packet.get());
67 }
68 m_queues[ac].emplace_back(expiry, request, std::move(packet));
69
70 if (first_packet) {
71 schedule_trigger(m_queues[ac].back());
72 }
73}
74
75boost::optional<FlowControl::PendingTransmission> FlowControl::dequeue()
76{
77 boost::optional<PendingTransmission> transmission;
78 Queue* queue = next_queue();
79 if (queue) {
80 transmission = std::move(queue->front());
81 queue->pop_front();
82 }
83
84 return transmission;
85}
86
87bool FlowControl::transmit_immediately(const Transmission& transmission) const
88{
89 const auto ac = map_profile_onto_ac(transmission.profile());
90
91 // is there any packet enqueued with equal or higher priority?
92 bool contention = false;
93 for (auto it = m_queues.cbegin(); it != m_queues.end(); ++it) {
94 if (it->first >= ac && !it->second.empty()) {
95 contention = true;
96 break;
97 }
98 }
99
100 return !contention && m_trc.delay(transmission) == Clock::duration::zero();
101}
102
103bool FlowControl::empty() const
104{
105 return std::all_of(m_queues.cbegin(), m_queues.cend(),
106 [](const std::pair<access::AccessCategory, const Queue&>& kv) {
107 return kv.second.empty();
108 });
109}
110
111FlowControl::Queue* FlowControl::next_queue()
112{
113 Queue* next = nullptr;
114 Clock::duration min_delay = Clock::duration::max();
115
116 for (auto& kv : m_queues) {
117 Queue& queue = kv.second;
118 if (!queue.empty()) {
119 const auto delay = m_trc.delay(queue.front());
120 if (delay < min_delay) {
121 min_delay = delay;
122 next = &queue;
123 }
124 }
125 }
126 return next;
127}
128
129FlowControl::PendingTransmission* FlowControl::next_transmission()
130{
131 Queue* queue = next_queue();
132 return queue ? &queue->front() : nullptr;
133}
134
135void FlowControl::drop_expired()
136{
137 for (auto& kv : m_queues) {
138 access::AccessCategory ac = kv.first;
139 Queue& queue = kv.second;
140 queue.remove_if([this, ac](const PendingTransmission& transmission) {
141 bool drop = transmission.expiry < m_runtime.now();
142 if (drop) {
143 m_packet_drop_hook(ac, transmission.packet.get());
144 }
145 return drop;
146 });
147 }
148}
149
150void FlowControl::transmit(const DataRequest& request, std::unique_ptr<ChunkPacket> packet)
151{
152 access::DataRequest access_request;
153 access_request.source_addr = request.source;
154 access_request.destination_addr = request.destination;
155 access_request.ether_type = request.ether_type;
156 access_request.access_category = map_profile_onto_ac(request.dcc_profile);
157
158 m_packet_transmit_hook(access_request.access_category, packet.get());
159 m_access.request(access_request, std::move(packet));
160}
161
162void FlowControl::set_packet_drop_hook(PacketDropHook::callback_type&& cb)
163{
164 m_packet_drop_hook = std::move(cb);
165}
166
167void FlowControl::set_packet_transmit_hook(PacketTransmitHook::callback_type&& cb)
168{
169 m_packet_transmit_hook = std::move(cb);
170}
171
172void FlowControl::queue_length(std::size_t length)
173{
174 m_queue_length = length;
175}
176
178{
179 PendingTransmission* next = next_transmission();
180 if (next) {
181 m_runtime.cancel(this);
182 schedule_trigger(*next);
183 }
184}
185
186} // namespace dcc
187} // namespace vanetza
virtual void cancel(const void *scope)=0
virtual Clock::time_point now() const =0
virtual void schedule(Clock::time_point tp, const Callback &cb, const void *scope=nullptr)=0
FlowControl(Runtime &, TransmitRateControl &, access::Interface &)
void set_packet_drop_hook(PacketDropHook::callback_type &&)
void queue_length(std::size_t length)
void request(const DataRequest &, std::unique_ptr< ChunkPacket >) override
void set_packet_transmit_hook(PacketTransmitHook::callback_type &&)
virtual void notify(const Transmission &tx)=0
virtual Clock::duration delay(const Transmission &tx)=0