Vanetza
 
Loading...
Searching...
No Matches
cbf_packet_buffer.cpp
1#include <vanetza/common/runtime.hpp>
2#include <vanetza/geonet/address.hpp>
3#include <vanetza/geonet/cbf_counter.hpp>
4#include <vanetza/geonet/cbf_packet_buffer.hpp>
5#include <vanetza/geonet/pdu.hpp>
6#include <vanetza/units/time.hpp>
7#include <cassert>
8#include <iterator>
9
10namespace vanetza
11{
12namespace geonet
13{
14
15CbfPacket::CbfPacket(PendingPacket<GbcPdu>&& packet, const MacAddress& sender) :
16 m_packet(std::move(packet)), m_sender(sender)
17{
18}
19
20CbfPacket::CbfPacket(PendingPacket<GbcPdu, const MacAddress&>&& packet, const MacAddress& sender) :
21 m_packet(PendingPacket<GbcPdu>(std::move(packet), cBroadcastMacAddress)), m_sender(sender)
22{
23}
24
25const MacAddress& CbfPacket::sender() const
26{
27 return m_sender;
28}
29
30const Address& CbfPacket::source() const
31{
32 return m_packet.pdu().extended().source_position.gn_addr;
33}
34
35SequenceNumber CbfPacket::sequence_number() const
36{
37 return m_packet.pdu().extended().sequence_number;
38}
39
40Clock::duration CbfPacket::reduce_lifetime(Clock::duration d)
41{
42 return m_packet.reduce_lifetime(d);
43}
44
45std::size_t CbfPacket::length() const
46{
47 return m_packet.length();
48}
49
50
51CbfPacketBuffer::CbfPacketBuffer(Runtime& rt, TimerCallback cb, std::unique_ptr<CbfCounter> cnt, std::size_t bytes) :
52 m_runtime(rt), m_counter(std::move(cnt)),
53 m_capacity_bytes(bytes), m_stored_bytes(0),
54 m_timer_callback(cb)
55{
56}
57
58CbfPacketBuffer::~CbfPacketBuffer()
59{
60 m_runtime.cancel(this);
61}
62
63bool CbfPacketBuffer::remove(const Identifier& id)
64{
65 bool packet_dropped = false;
66
67 auto& id_map = m_timers.right;
68 auto found = id_map.find(id);
69 if (found != id_map.end()) {
70 auto& packet = found->info;
71 m_stored_bytes -= packet->length();
72 m_counter->remove(id);
73 m_packets.erase(packet);
74 remove_timer(m_timers.project_left(found));
75 packet_dropped = true;
76 }
77
78 assert(m_packets.size() == m_timers.size());
79 return packet_dropped;
80}
81
82void CbfPacketBuffer::remove_timer(typename timer_bimap::left_map::iterator timer_it)
83{
84 auto& timer_map = m_timers.left;
85 auto successor = timer_map.erase(timer_it);
86 if (successor == timer_map.begin() && !timer_map.empty()) {
87 // erased timer was scheduled one, reschedule timer trigger
89 }
90}
91
92void CbfPacketBuffer::add(CbfPacket&& packet, Clock::duration timeout)
93{
94 if(timeout <= Clock::duration::zero()) return;
95 m_stored_bytes += packet.length();
96 const auto first_timer = m_timers.left.begin();
97
98 // do head drop if necessary
99 while (m_stored_bytes > m_capacity_bytes && !m_packets.empty()) {
100 m_stored_bytes -= m_packets.front().length();
101 const auto id = identifier(m_packets.front());
102 m_timers.right.erase(id);
103 m_counter->remove(id);
104 m_packets.pop_front();
105 }
106
107 Timer timer = { m_runtime, timeout };
108 const Identifier id = identifier(packet);
109 m_packets.emplace_back(std::move(packet));
110 using timer_value = timer_bimap::value_type;
111 auto insertion = m_timers.insert(timer_value { timer, id, std::prev(m_packets.end()) });
112 if (!insertion.second) {
113 m_stored_bytes -= m_packets.back().length();
114 m_packets.pop_back();
115 } else {
116 m_counter->add(id);
117 }
118
119 // first expirying timer has changed (head drop or added packet)
120 if (m_timers.left.begin() != first_timer) {
122 }
123 assert(m_packets.size() == m_timers.size());
124}
125
126void CbfPacketBuffer::update(const Identifier& id, Clock::duration timeout)
127{
128 auto& id_map = m_timers.right;
129 auto found = id_map.find(id);
130 if (found != id_map.end()) {
131 const Timer& timer = found->second;
132 CbfPacket& cbf_packet = *found->info;
133 reduce_lifetime(timer, cbf_packet);
134 id_map.replace_data(found, Timer { m_runtime, timeout});
135 m_counter->increment(id);
136 }
137}
138
139boost::optional<CbfPacket> CbfPacketBuffer::fetch(const Identifier& id)
140{
141 boost::optional<CbfPacket> packet;
142
143 auto& id_map = m_timers.right;
144 auto found = id_map.find(id);
145 if (found != id_map.end()) {
146 const Timer& timer = found->second;
147 CbfPacket& cbf_packet = *found->info;
148 bool valid_packet = reduce_lifetime(timer, cbf_packet);
149 m_stored_bytes -= cbf_packet.length();
150 if (valid_packet) {
151 packet.emplace(std::move(cbf_packet));
152 }
153 m_counter->remove(id);
154 m_packets.erase(found->info);
155 remove_timer(m_timers.project_left(found));
156 }
157
158 return packet;
159}
160
161const CbfPacket* CbfPacketBuffer::find(const Identifier& id) const
162{
163 const auto& id_map = m_timers.right;
164 auto found = id_map.find(id);
165 return found != id_map.end() ? &(*found->info) : nullptr;
166}
167
168std::size_t CbfPacketBuffer::counter(const Identifier& id) const
169{
170 return m_counter->counter(id);
171}
172
174{
175 // fetch all expired timers
176 const Timer now { m_runtime, std::chrono::seconds(0) };
177 auto end = m_timers.left.upper_bound(now);
178 for (auto it = m_timers.left.begin(); it != end;) {
179 // reduce LT by queuing time
180 const Timer& timer = it->first;
181 CbfPacket& packet = *it->info;
182 bool valid_packet = reduce_lifetime(timer, packet);
183 m_stored_bytes -= packet.length();
184 if (valid_packet) {
185 m_timer_callback(std::move(packet).packet());
186 }
187
188 m_counter->remove(it->second);
189 m_packets.erase(it->info);
190 it = m_timers.left.erase(it);
191 }
192
193 // schedule timer if not empty
194 if (!m_timers.empty()) {
196 }
197}
198
199bool CbfPacketBuffer::reduce_lifetime(const Timer& timer, CbfPacket& packet) const
200{
201 const auto queuing_time = m_runtime.now() - timer.start;
202 return packet.reduce_lifetime(queuing_time) > Clock::duration::zero();
203}
204
206{
207 assert(!m_timers.empty());
208 m_runtime.cancel(this);
209 Runtime::Callback cb = [this](Clock::time_point) { flush(); };
210 m_runtime.schedule(m_timers.left.begin()->first.expiry, cb, this);
211}
212
213
214CbfPacketBuffer::Timer::Timer(const Runtime& rt, Clock::duration timeout) :
215 expiry(rt.now() + timeout), start(rt.now())
216{
217}
218
219bool CbfPacketBuffer::Timer::operator<(const Timer& other) const
220{
221 return this->expiry < other.expiry;
222}
223
224} // namespace geonet
225} // namespace vanetza
226
virtual void cancel(const void *scope)=0
virtual Clock::time_point now() const =0
virtual void schedule(Clock::time_point tp, const Callback &cb, const void *scope=nullptr)=0
std::size_t counter(const Identifier &packet) const
boost::optional< CbfPacket > fetch(const Identifier &id)
void remove_timer(typename timer_bimap::left_map::iterator)
bool remove(const Identifier &id)
void update(const Identifier &id, Clock::duration timeout)
void add(CbfPacket &&packet, Clock::duration timeout)
const CbfPacket * find(const Identifier &id) const
bool reduce_lifetime(const Timer &, CbfPacket &) const
Clock::duration reduce_lifetime(Clock::duration d)
STL namespace.