-
Notifications
You must be signed in to change notification settings - Fork 80
/
node_behaviour.cpp
1221 lines (1012 loc) · 66.6 KB
/
node_behaviour.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#include "nmos/node_behaviour.h"
#include "pplx/pplx_utils.h" // for pplx::complete_at
#include "cpprest/http_client.h"
#include "cpprest/json_storage.h"
#include "mdns/service_advertiser.h"
#include "mdns/service_discovery.h"
#include "nmos/api_downgrade.h"
#include "nmos/api_utils.h" // for nmos::type_from_resourceType
#include "nmos/authorization_state.h"
#include "nmos/client_utils.h"
#include "nmos/mdns.h"
#include "nmos/model.h"
#include "nmos/query_utils.h"
#include "nmos/random.h"
#include "nmos/rational.h"
#include "nmos/slog.h"
#include "nmos/thread_utils.h" // for wait_until, reverse_lock_guard
#include "nmos/version.h"
namespace nmos
{
namespace details
{
void node_behaviour_thread(nmos::model& model, load_ca_certificates_handler load_ca_certificates, registration_handler registration_changed, nmos::experimental::get_authorization_bearer_token_handler get_authorization_bearer_token, mdns::service_advertiser& advertiser, mdns::service_discovery& discovery, slog::base_gate& gate);
// registered operation
void initial_registration(nmos::id& self_id, nmos::model& model, const nmos::id& grain_id, load_ca_certificates_handler load_ca_certificates, nmos::experimental::get_authorization_bearer_token_handler get_authorization_bearer_token, slog::base_gate& gate);
void registered_operation(const nmos::id& self_id, nmos::model& model, const nmos::id& grain_id, load_ca_certificates_handler load_ca_certificates, registration_handler registration_changed, nmos::experimental::get_authorization_bearer_token_handler get_authorization_bearer_token, slog::base_gate& gate);
// peer to peer operation
void peer_to_peer_operation(nmos::model& model, const nmos::id& grain_id, mdns::service_discovery& discovery, mdns::service_advertiser& advertiser, slog::base_gate& gate);
// service advertisement/discovery
void advertise_node_service(const nmos::base_model& model, mdns::service_advertiser& advertiser);
bool discover_registration_services(nmos::base_model& model, mdns::service_discovery& discovery, slog::base_gate& gate, const pplx::cancellation_token& token = pplx::cancellation_token::none());
bool has_discovered_registration_services(const nmos::model& model);
// a (fake) subscription to keep track of all resource events
nmos::resource make_node_behaviour_subscription(const nmos::id& id);
nmos::resource make_node_behaviour_grain(const nmos::id& id, const nmos::id& subscription_id);
}
// uses the default DNS-SD implementation
// callbacks from this function are called with the model locked, and may read or write directly to the model
void node_behaviour_thread(nmos::model& model, load_ca_certificates_handler load_ca_certificates, registration_handler registration_changed, nmos::experimental::get_authorization_bearer_token_handler get_authorization_bearer_token, slog::base_gate& gate_)
{
nmos::details::omanip_gate gate(gate_, nmos::stash_category(nmos::categories::node_behaviour));
mdns::service_advertiser advertiser(gate);
mdns::service_advertiser_guard advertiser_guard(advertiser);
mdns::service_discovery discovery(gate);
details::node_behaviour_thread(model, std::move(load_ca_certificates), std::move(registration_changed), std::move(get_authorization_bearer_token), advertiser, discovery, gate);
}
void node_behaviour_thread(nmos::model& model, load_ca_certificates_handler load_ca_certificates, registration_handler registration_changed, slog::base_gate& gate_)
{
nmos::details::omanip_gate gate(gate_, nmos::stash_category(nmos::categories::node_behaviour));
mdns::service_advertiser advertiser(gate);
mdns::service_advertiser_guard advertiser_guard(advertiser);
mdns::service_discovery discovery(gate);
details::node_behaviour_thread(model, std::move(load_ca_certificates), std::move(registration_changed), {}, advertiser, discovery, gate);
}
// uses the specified DNS-SD implementation
// callbacks from this function are called with the model locked, and may read or write directly to the model
void node_behaviour_thread(nmos::model& model, load_ca_certificates_handler load_ca_certificates, registration_handler registration_changed, nmos::experimental::get_authorization_bearer_token_handler get_authorization_bearer_token, mdns::service_advertiser& advertiser, mdns::service_discovery& discovery, slog::base_gate& gate_)
{
nmos::details::omanip_gate gate(gate_, nmos::stash_category(nmos::categories::node_behaviour));
details::node_behaviour_thread(model, std::move(load_ca_certificates), std::move(registration_changed), std::move(get_authorization_bearer_token), advertiser, discovery, gate);
}
void node_behaviour_thread(nmos::model& model, load_ca_certificates_handler load_ca_certificates, registration_handler registration_changed, mdns::service_advertiser& advertiser, mdns::service_discovery& discovery, slog::base_gate& gate)
{
node_behaviour_thread(model, std::move(load_ca_certificates), std::move(registration_changed), {}, advertiser, discovery, gate);
}
// uses the default DNS-SD implementation
void node_behaviour_thread(nmos::model& model, load_ca_certificates_handler load_ca_certificates, slog::base_gate& gate)
{
node_behaviour_thread(model, load_ca_certificates, {}, {}, gate);
}
// uses the specified DNS-SD implementation
void node_behaviour_thread(nmos::model& model, load_ca_certificates_handler load_ca_certificates, mdns::service_advertiser& advertiser, mdns::service_discovery& discovery, slog::base_gate& gate)
{
node_behaviour_thread(model, load_ca_certificates, {}, {}, advertiser, discovery, gate);
}
void details::node_behaviour_thread(nmos::model& model, load_ca_certificates_handler load_ca_certificates, registration_handler registration_changed, nmos::experimental::get_authorization_bearer_token_handler get_authorization_bearer_token, mdns::service_advertiser& advertiser, mdns::service_discovery& discovery, slog::base_gate& gate)
{
// The possible states of node behaviour represent the two primary modes (registered operation and peer-to-peer operation)
// and a few hopefully ephemeral states as the node works through the "Standard Registration Sequences".
// See https://specs.amwa.tv/is-04/releases/v1.2.0/docs/4.1._Behaviour_-_Registration.html
enum
{
initial_discovery,
initial_registration,
registered_operation,
rediscovery,
peer_to_peer_operation
} mode = initial_discovery;
// "1. A Node is connected to the network"
// "2. The Node runs an HTTP accessible Node API."
// These should have happened by now...
// "3. The Node produces an mDNS advertisement of type '_nmos-node._tcp' in the '.local' domain as specified in Node API."
details::advertise_node_service(model, advertiser);
// "If the chosen Registration API does not respond correctly at any time, another Registration API should be selected from the discovered list."
// See https://specs.amwa.tv/is-04/releases/v1.2.0/docs/3.1._Discovery_-_Registered_Operation.html
// hmm, it seems inefficient to store the discovered list in settings, when it's currently only used by this thread, but TR-1001-1:2018 insists
// "Media Nodes should, through product-specific means, provide a status parameter indicating which registration service is currently in use."
with_write_lock(model.mutex, [&model] { model.settings[nmos::fields::registration_services] = web::json::value::array(); });
nmos::details::seed_generator discovery_backoff_seeder;
std::default_random_engine discovery_backoff_engine(discovery_backoff_seeder);
double discovery_backoff = 0;
// a (fake) subscription to keep track of all resource events
auto grain_id = nmos::make_id();
with_write_lock(model.mutex, [&model, &grain_id]
{
auto subscription_id = nmos::make_id();
insert_resource(model.node_resources, details::make_node_behaviour_subscription(subscription_id));
insert_resource(model.node_resources, details::make_node_behaviour_grain(grain_id, subscription_id));
});
// there should be exactly one node resource, but it may not have been added yet
// and during a controlled shutdown it may be removed; it is therefore identified
// during initial registration for use in registered operation
nmos::id self_id;
// continue until the server is being shut down
for (;;)
{
if (with_read_lock(model.mutex, [&] { return model.shutdown; })) break;
switch (mode)
{
case initial_discovery:
case rediscovery:
if (0 != discovery_backoff)
{
auto lock = model.read_lock();
const auto random_backoff = std::uniform_real_distribution<>(0, discovery_backoff)(discovery_backoff_engine);
slog::log<slog::severities::more_info>(gate, SLOG_FLF) << "Waiting to retry Registration API discovery for about " << std::fixed << std::setprecision(3) << random_backoff << " seconds (current backoff limit: " << discovery_backoff << " seconds)";
model.wait_for(lock, std::chrono::milliseconds(std::chrono::milliseconds::rep(1000 * random_backoff)), [&] { return model.shutdown; });
if (model.shutdown) break;
}
// "4. The Node performs a DNS-SD browse for services of type '_nmos-registration._tcp' as specified."
if (details::discover_registration_services(model, discovery, gate))
{
mode = initial_discovery == mode ? initial_registration : registered_operation;
// "Should a 5xx error be encountered when interacting with all discoverable Registration APIs it is recommended that clients
// implement an exponential backoff algorithm in their next attempts until a non-5xx response code is received."
// See https://specs.amwa.tv/is-04/releases/v1.2.0/docs/4.1._Behaviour_-_Registration.html#node-encounters-http-500-or-other-5xx-inability-to-connect-or-a-timeout-on-heartbeat
auto lock = model.read_lock();
discovery_backoff = (std::min)((std::max)((double)nmos::fields::discovery_backoff_min(model.settings), discovery_backoff * nmos::fields::discovery_backoff_factor(model.settings)), (double)nmos::fields::discovery_backoff_max(model.settings));
}
else
{
// "If no Registration APIs are advertised on a network, the Node should assume peer to peer operation unless configured otherwise."
mode = peer_to_peer_operation;
}
break;
case initial_registration:
// "5. The Node registers itself with the Registration API by taking the object it holds under the Node API's /self resource and POSTing this to the Registration API."
details::initial_registration(self_id, model, grain_id, load_ca_certificates, get_authorization_bearer_token, gate);
if (details::has_discovered_registration_services(model))
{
mode = registered_operation;
discovery_backoff = 0;
}
else
{
mode = initial_discovery;
}
break;
case registered_operation:
// "6. The Node persists itself in the registry by issuing heartbeats."
// "7. The Node registers its other resources (from /devices, /sources etc) with the Registration API."
details::registered_operation(self_id, model, grain_id, load_ca_certificates, registration_changed, get_authorization_bearer_token, gate);
if (details::has_discovered_registration_services(model))
{
// "A 404 error on heartbeat indicates that the Node performing the heartbeat is not known to the Registration API. [The] Node must re-register each of its resources with the Registration API in order."
mode = initial_registration;
discovery_backoff = 0;
}
else
{
// "Should no further Registration APIs be available or TTLs on advertised services expired, a re-query may be performed."
mode = rediscovery;
}
break;
case peer_to_peer_operation:
details::peer_to_peer_operation(model, grain_id, discovery, advertiser, gate);
if (details::has_discovered_registration_services(model))
{
mode = initial_registration;
}
break;
}
}
}
// service advertisement/discovery
namespace details
{
// register the node service with the required TXT records
void advertise_node_service(mdns::service_advertiser& advertiser, const nmos::settings& settings)
{
const auto pri = nmos::fields::pri(settings);
// no_priority allows the node to run unadvertised
if (nmos::service_priorities::no_priority != pri)
{
nmos::experimental::register_addresses(advertiser, settings);
nmos::experimental::register_service(advertiser, nmos::service_types::node, settings);
}
}
void advertise_node_service(const nmos::base_model& model, mdns::service_advertiser& advertiser)
{
advertise_node_service(advertiser, with_read_lock(model.mutex, [&] { return model.settings; }));
}
// get the fallback registration service from settings (if present)
web::uri get_registration_service(const nmos::settings& settings)
{
return settings.has_field(nmos::fields::registry_address)
? web::uri_builder()
.set_scheme(nmos::http_scheme(settings))
.set_host(nmos::fields::registry_address(settings))
.set_port(nmos::fields::registration_port(settings))
.set_path(U("/x-nmos/registration/") + nmos::fields::registry_version(settings))
.to_uri()
: web::uri();
}
// query DNS Service Discovery for any Registration API based on settings
bool discover_registration_services(nmos::base_model& model, mdns::service_discovery& discovery, slog::base_gate& gate, const pplx::cancellation_token& token)
{
slog::log<slog::severities::more_info>(gate, SLOG_FLF) << "Trying Registration API discovery";
// lock to read settings, then unlock to wait for the discovery task to complete
auto registration_services = with_read_lock(model.mutex, [&]
{
auto& settings = model.settings;
if (nmos::service_priorities::no_priority != nmos::fields::highest_pri(settings))
{
slog::log<slog::severities::info>(gate, SLOG_FLF) << "Attempting discovery of a Registration API in domain: " << nmos::get_domain(settings);
return nmos::experimental::resolve_service(discovery, nmos::service_types::registration, settings, token);
}
else
{
return pplx::task_from_result(std::list<web::uri>{});
}
}).get();
with_write_lock(model.mutex, [&]
{
if (!registration_services.empty())
{
slog::log<slog::severities::info>(gate, SLOG_FLF) << "Discovered " << registration_services.size() << " Registration API(s)";
}
else
{
slog::log<slog::severities::warning>(gate, SLOG_FLF) << "Did not discover a suitable Registration API via DNS-SD";
auto fallback_registration_service = get_registration_service(model.settings);
if (!fallback_registration_service.is_empty())
{
registration_services.push_back(fallback_registration_service);
}
}
if (!registration_services.empty()) slog::log<slog::severities::more_info>(gate, SLOG_FLF) << "Using the Registration API(s):" << slog::log_manip([&](slog::log_statement& s)
{
for (auto& registration_service : registration_services)
{
s << '\n' << registration_service.to_string();
}
});
model.settings[nmos::fields::registration_services] = web::json::value_from_elements(registration_services | boost::adaptors::transformed([](const web::uri& u) { return u.to_string(); }));
model.notify();
});
return !registration_services.empty();
}
bool empty_registration_services(const nmos::settings& settings)
{
return web::json::empty(nmos::fields::registration_services(settings));
}
bool has_discovered_registration_services(const nmos::model& model)
{
return with_read_lock(model.mutex, [&] { return !empty_registration_services(model.settings); });
}
// "The Node selects a Registration API to use based on the priority"
web::uri top_registration_service(const nmos::settings& settings)
{
return web::uri(web::json::front(nmos::fields::registration_services(settings)).as_string());
}
// "If the chosen Registration API does not respond correctly at any time,
// another Registration API should be selected from the discovered list."
void pop_registration_service(nmos::settings& settings)
{
web::json::pop_front(nmos::fields::registration_services(settings));
// "TTLs on advertised services" may have expired too, so should cache time-to-live values
// using DNSServiceQueryRecord instead of DNSServiceResolve?
}
}
// a (fake) subscription to keep track of all resource events
namespace details
{
const utility::string_t node_behaviour_resource_path;
const utility::string_t node_behaviour_topic = node_behaviour_resource_path + U("/");
nmos::resource make_node_behaviour_subscription(const nmos::id& id)
{
using web::json::value;
value data;
data[nmos::fields::id] = value::string(id);
data[nmos::fields::max_update_rate_ms] = 0; // no throttling used at present
data[nmos::fields::persist] = value::boolean(false); // not to be deleted by someone else
data[nmos::fields::resource_path] = value::string(node_behaviour_resource_path);
data[nmos::fields::params] = value::object();
// no ws_href since subscriptions are inaccessible on the Node API anyway
// the subscription version must be updated for the Registration API version (see initial_registration)
return{ nmos::is04_versions::v1_3, nmos::types::subscription, std::move(data), true };
}
nmos::resource make_node_behaviour_grain(const nmos::id& id, const nmos::id& subscription_id)
{
using web::json::value;
value data;
data[nmos::fields::id] = value::string(id);
data[nmos::fields::subscription_id] = value::string(subscription_id);
data[nmos::fields::message] = details::make_grain(nmos::make_id(), subscription_id, node_behaviour_topic);
nmos::fields::message_grain_data(data) = value::array();
return{ nmos::is04_versions::v1_3, nmos::types::grain, std::move(data), true };
}
struct node_behaviour_grain_guard
{
node_behaviour_grain_guard(nmos::resources& resources, nmos::resources::iterator grain, web::json::value& events)
: resources(resources)
, grain(grain)
, events(events)
{
// steal the events from the grain
// reset the grain for next time
resources.modify(grain, [&](nmos::resource& grain)
{
using std::swap;
swap(events, nmos::fields::message_grain_data(grain.data));
grain.updated = strictly_increasing_update(resources);
});
}
~node_behaviour_grain_guard()
{
if (0 == events.size()) return;
// restore any remaining events to the grain
resources.modify(grain, [&](nmos::resource& grain)
{
auto& events_storage = web::json::storage_of(events.as_array());
auto& grain_storage = web::json::storage_of(nmos::fields::message_grain_data(grain.data).as_array());
if (!grain_storage.empty())
{
events_storage.insert(events_storage.end(), std::make_move_iterator(grain_storage.begin()), std::make_move_iterator(grain_storage.end()));
grain_storage.clear();
}
using std::swap;
swap(grain_storage, events_storage);
grain.updated = strictly_increasing_update(resources);
});
}
nmos::resources& resources;
nmos::resources::iterator grain;
web::json::value& events;
};
}
// registered operation
namespace details
{
web::json::value make_registration_request_body(const nmos::type& type, const web::json::value& data)
{
return web::json::value_of(
{
{ U("type"), web::json::value::string(type.name) },
{ U("data"), data }
});
}
// server-side (5xx) registration error
struct registration_service_exception {};
// should be called when an error condition has been identified, because it will always log
void handle_registration_error_conditions(const web::http::http_response& response, bool handle_client_error_as_server_error, slog::base_gate& gate, const char* operation)
{
// "For HTTP codes 400 and upwards, a JSON format response MUST be returned [in which]
// the 'code' should always match the HTTP status code. 'error' must always be present
// and in string format. 'debug' may be null if no further debug information is available"
// See https://specs.amwa.tv/is-04/releases/v1.2.0/docs/2.0._APIs.html#error-codes--responses
// Especially in the case of client (4xx) errors, logging these would be a good idea, but
// would necessitate blocking for the response body, and extracting them from the json
// and dealing with potential errors along the way...
// "A 500 [or other 5xx] error, inability to connect or a timeout indicates a server side or connectivity issue."
// See https://specs.amwa.tv/is-04/releases/v1.2.0/docs/4.1._Behaviour_-_Registration.html#node-encounters-http-500-or-other-5xx-inability-to-connect-or-a-timeout-on-heartbeat
if (handle_client_error_as_server_error ? web::http::is_error_status_code(response.status_code()) : web::http::is_server_error_status_code(response.status_code()))
{
// this could be regarded as a 'severe' error - presumably it is for the registry
// on the other hand, since the node has a strategy to recover, it could be regarded as only a 'warning'
// so on balance, log as an 'error'
slog::log<slog::severities::error>(gate, SLOG_FLF) << "Registration " << operation << " error: " << response.status_code() << " " << response.reason_phrase();
throw registration_service_exception();
}
// "A 400 [or other 4xx] error [in response to a POST] indicates a client error which is likely
// to be the result of a validation failure identified by the Registration API."
// See https://specs.amwa.tv/is-04/releases/v1.2.0/docs/4.1._Behaviour_-_Registration.html#node-encounters-http-400-or-other-4xx-on-registration
else if (web::http::is_client_error_status_code(response.status_code()))
{
// the severity here is trickier, since if it truly indicated a validation failure, this is a 'severe' error
// but unfortunately, there are circumstances described below where it could be regarded as only a 'warning'
// so again, until there's a means to distinguish these cases, log as an 'error'
slog::log<slog::severities::error>(gate, SLOG_FLF) << "Registration " << operation << " error: " << response.status_code() << " " << response.reason_phrase();
// "The same request must not be re-attempted without corrective action being taken first.
// Error responses as detailed in the APIs documentation may assist with debugging these issues."
// In an automated system, the best option seems to be to allow the registry-held representation
// of the Node's resources to become out of sync with the Node's view, and flag this to the user
// as visibly as possible.
// Note that a 400 error can also indicate that the super-resource was not found due to recent
// garbage collection in the Registration API, even when this has not yet been indicated by a
// 404 error on heartbeat. Unfortunately, this situation cannot easily be distinguished from a
// validation failure at this time, another reason not to handle 4xx errors like 5xx errors.
// Similarly, a 404 error in response to a DELETE indicates either that the resource has already
// been explicitly deleted (i.e. a real error somewhere), or that it was not found due to recent
// garbage collection as above.
}
else
{
// this is a non-error status code, it might even be a successful (2xx) code, but since the
// calling function didn't expect it, log as an 'error'
slog::log<slog::severities::error>(gate, SLOG_FLF) << "Registration " << operation << " error: " << response.status_code() << " " << response.reason_phrase();
}
}
void handle_registration_error_conditions(const web::http::http_response& response, slog::base_gate& gate, const char* operation)
{
handle_registration_error_conditions(response, false, gate, operation);
}
web::http::client::http_client_config make_registration_client_config(const nmos::settings& settings, load_ca_certificates_handler load_ca_certificates, const web::http::oauth2::experimental::oauth2_token& bearer_token, slog::base_gate& gate)
{
auto config = nmos::make_http_client_config(settings, std::move(load_ca_certificates), bearer_token, gate);
config.set_timeout(std::chrono::seconds(nmos::fields::registration_request_max(settings)));
return config;
}
web::http::client::http_client_config make_heartbeat_client_config(const nmos::settings& settings, load_ca_certificates_handler load_ca_certificates, const web::http::oauth2::experimental::oauth2_token& bearer_token, slog::base_gate& gate)
{
auto config = nmos::make_http_client_config(settings, std::move(load_ca_certificates), bearer_token, gate);
config.set_timeout(std::chrono::seconds(nmos::fields::registration_heartbeat_max(settings)));
return config;
}
// make an asynchronous POST or DELETE request on the Registration API specified by the client for the specified resource event
pplx::task<void> request_registration(web::http::client::http_client client, const web::json::value& event, slog::base_gate& gate, const pplx::cancellation_token& token = pplx::cancellation_token::none())
{
const auto& path = event.at(U("path")).as_string();
const auto id_type = get_resource_event_resource(node_behaviour_topic, event);
const auto event_type = get_resource_event_type(event);
// An 'added' event calls for registration creation, i.e. a POST request with a 201 'Created' response (200 'OK' is unexpected)
// A 'removed' event calls for registration deletion, i.e. a DELETE request with a 204 'No Content' response
// A 'modified' event calls for a registration update, i.e. a POST request with a 200 'OK' response (201 'Created'is unexpected)
// A 'sync' event is also an (unnecessary) registration update, i.e. a POST request with a 200 'OK' response
// See https://specs.amwa.tv/is-04/releases/v1.2.0/APIs/RegistrationAPI.html
const bool creation = resource_added_event == event_type;
const bool update = resource_modified_event == event_type || resource_unchanged_event == event_type;
const bool deletion = resource_removed_event == event_type;
if (creation)
{
slog::log<slog::severities::info>(gate, SLOG_FLF) << "Requesting registration creation for " << id_type;
auto body = make_registration_request_body(id_type.second, event.at(U("post")));
return api_request(client, web::http::methods::POST, U("/resource"), body, gate, token).then([=, &gate](web::http::http_response response) mutable
{
// hmm, when I tried to make this a task-based continuation in order to just return the response_task argument (in most cases)
// the enclosing then call failed to compile
// "On first registration with a Registration API this should result in a '201 Created' HTTP response code.
// If a Node receives a 200 code in this case, a previous record of the Node can be assumed to still exist."
// See https://specs.amwa.tv/is-04/releases/v1.2.0/docs/4.1._Behaviour_-_Registration.html#node-encounters-http-200-on-first-registration
if (web::http::status_codes::Created == response.status_code())
{
// successful registration will be logged by the continuation
return pplx::task_from_result(response);
}
else if (web::http::status_codes::OK == response.status_code() || web::http::status_codes::Conflict == response.status_code())
{
slog::log<slog::severities::warning>(gate, SLOG_FLF) << "Registration out of sync for " << id_type;
slog::log<slog::severities::info>(gate, SLOG_FLF) << "Requesting out of sync registration deletion for " << id_type;
// "In order to avoid the registry-held representation of the Node's resources from being out of sync
// with the Node's view, an HTTP DELETE should be performed in this situation to explicitly clear the
// registry of the Node and any sub-resources."
pplx::task<web::http::http_response> deletion;
if (response.headers().has(web::http::header_names::location))
{
// Location may be a relative (to the request URL) or absolute URL
auto request_uri = web::uri_builder(client.base_uri()).append_path(U("/resource")).to_uri();
auto location_uri = request_uri.resolve_uri(response.headers()[web::http::header_names::location]);
auto deletion_client = nmos::details::make_http_client(location_uri, client.client_config());
deletion = api_request(*deletion_client, web::http::methods::DEL, gate, token);
}
else
{
deletion = api_request(client, web::http::methods::DEL, U("/resource/") + path, gate, token);
}
return deletion.then([=, &gate](web::http::http_response response) mutable
{
if (web::http::status_codes::NoContent == response.status_code())
{
slog::log<slog::severities::more_info>(gate, SLOG_FLF) << "Registration deleted for " << id_type;
}
else
{
handle_registration_error_conditions(response, gate, "deletion");
}
slog::log<slog::severities::info>(gate, SLOG_FLF) << "Re-requesting registration creation for " << id_type;
// "A new Node registration after this point should result in the correct 201 response code."
return api_request(client, web::http::methods::POST, U("/resource"), body, gate, token);
});
}
else
{
// registration errors (4xx, 5xx) will be logged by the continuation
return pplx::task_from_result(response);
}
}).then([=, &gate](web::http::http_response response)
{
if (web::http::status_codes::Created == response.status_code())
{
slog::log<slog::severities::more_info>(gate, SLOG_FLF) << "Registration created for " << id_type;
}
else
{
// after a 400 or other 4xx error in response to creating the registration for the node resource itself
// there's definitely no point adopting registered operation, so handle this case like a server error
const bool initial_registration = nmos::types::node == id_type.second;
handle_registration_error_conditions(response, initial_registration, gate, "creation");
}
});
}
else if (update)
{
slog::log<slog::severities::info>(gate, SLOG_FLF) << "Requesting registration update for " << id_type;
auto body = make_registration_request_body(id_type.second, event.at(U("post")));
return api_request(client, web::http::methods::POST, U("/resource"), body, gate, token).then([=, &gate](web::http::http_response response)
{
if (web::http::status_codes::OK == response.status_code())
{
slog::log<slog::severities::more_info>(gate, SLOG_FLF) << "Registration updated for " << id_type;
}
else
{
handle_registration_error_conditions(response, gate, "update");
}
});
}
else if (deletion)
{
slog::log<slog::severities::info>(gate, SLOG_FLF) << "Requesting registration deletion for " << id_type;
return api_request(client, web::http::methods::DEL, U("/resource/") + path, gate, token).then([=, &gate](web::http::http_response response)
{
if (web::http::status_codes::NoContent == response.status_code())
{
slog::log<slog::severities::more_info>(gate, SLOG_FLF) << "Registration deleted for " << id_type;
}
else
{
handle_registration_error_conditions(response, gate, "deletion");
}
});
}
// probably an error to get here
return pplx::task_from_result();
}
// asynchronously perform a heartbeat and return a result that indicates whether the heartbeat was successful
pplx::task<bool> update_node_health(web::http::client::http_client client, const nmos::id& id, slog::base_gate& gate, const pplx::cancellation_token& token = pplx::cancellation_token::none())
{
slog::log<slog::severities::too_much_info>(gate, SLOG_FLF) << "Posting registration heartbeat for node: " << id;
return api_request(client, web::http::methods::POST, U("/health/nodes/") + id, gate, token).then([=, &gate](pplx::task<web::http::http_response> response_task)
{
auto response = response_task.get(); // may throw http_exception
if (web::http::status_codes::OK == response.status_code())
{
return true;
}
else if (web::http::status_codes::NotFound == response.status_code())
{
// although there's a recovery strategy here, so this could be regarded as a 'warning'
// it is definitely unexpected, so log it as an 'error'
slog::log<slog::severities::error>(gate, SLOG_FLF) << "Registration heartbeat error: " << response.status_code() << " " << response.reason_phrase();
// "On encountering this code, a Node must re-register each of its resources with the Registration API in order."
// See https://specs.amwa.tv/is-04/releases/v1.2.0/docs/4.1._Behaviour_-_Registration.html#node-encounters-http-404-on-heartbeat
return false;
}
else
{
handle_registration_error_conditions(response, gate, "heartbeat");
// if we get here, it's not a server (5xx) error, so the best option seems to be to continue
// even though we don't really know what's going on...
return true;
}
}, token);
}
// there is significant similarity between initial_registration and registered_operation but I'm too tired to refactor again right now...
void initial_registration(nmos::id& self_id, nmos::model& model, const nmos::id& grain_id, load_ca_certificates_handler load_ca_certificates, nmos::experimental::get_authorization_bearer_token_handler get_authorization_bearer_token, slog::base_gate& gate)
{
slog::log<slog::severities::info>(gate, SLOG_FLF) << "Attempting initial registration";
auto lock = model.write_lock();
auto& condition = model.condition;
auto& shutdown = model.shutdown;
auto& resources = model.node_resources;
const auto grain = nmos::find_resource(resources, { grain_id, nmos::types::grain });
if (resources.end() == grain) return;
const auto subscription = nmos::find_resource(resources, nmos::get_super_resource(*grain));
if (resources.end() == subscription) return;
std::unique_ptr<web::http::client::http_client> registration_client;
bool registration_service_error(false);
bool node_registered(false);
web::json::value events;
// background tasks may read/write the above local state by reference
pplx::cancellation_token_source cancellation_source;
pplx::task<void> request = pplx::task_from_result();
tai most_recent_update{};
for (;;)
{
// wait for the thread to be interrupted because there are resource events (or this is the first time through)
// or because the node was registered successfully
// or because an error has been encountered with the selected registration service
// or because the server is being shut down
condition.wait(lock, [&]{ return shutdown || registration_service_error || node_registered || most_recent_update < grain->updated; });
if (registration_service_error)
{
pop_registration_service(model.settings);
model.notify();
registration_service_error = false;
cancellation_source.cancel();
// wait without the lock since it is also used by the background tasks
details::reverse_lock_guard<nmos::write_lock> unlock{ lock };
request.wait();
registration_client.reset();
cancellation_source = pplx::cancellation_token_source();
}
if (shutdown || empty_registration_services(model.settings) || node_registered) break;
// "The Node selects a Registration API to use based on the priority"
if (!registration_client)
{
const auto base_uri = top_registration_service(model.settings);
// "5. The Node registers itself with the Registration API by taking the object it holds under the Node API's /self resource and POSTing this to the Registration API."
// "Nodes which support multiple versions simultaneously MUST ensure that all of their resources meet the schemas for each corresponding version of the specification[...]
// It may be necessary to expose only a limited subset of a Node's resources from lower versioned endpoints."
// See https://specs.amwa.tv/is-04/releases/v1.2.2/docs/6.0._Upgrade_Path.html#version-translations
// base uri should be like http://api.example.com/x-nmos/registration/{version}
const auto registry_version = parse_api_version(web::uri::split_path(base_uri.path()).back());
// reset the node behaviour subscription for the Registration API version
resources.modify(subscription, [&resources, ®istry_version](nmos::resource& subscription)
{
subscription.version = registry_version;
subscription.updated = strictly_increasing_update(resources);
});
// reset the node behaviour subscription grain; if the node resource has already been added to the model then
// the first event will be a 'sync' event for the node (and if not, there really should be no events at all!)
resources.modify(grain, [&resources, ®istry_version](nmos::resource& grain)
{
grain.version = registry_version;
auto& events = nmos::fields::message_grain_data(grain.data);
// the node behaviour subscription resource_path and params are currently fixed (see make_node_behaviour_subscription)
events = make_resource_events(resources, registry_version, U(""), web::json::value::object(), false);
grain.updated = strictly_increasing_update(resources);
});
const auto bearer_token = get_authorization_bearer_token ? get_authorization_bearer_token() : web::http::oauth2::experimental::oauth2_token{};
registration_client = nmos::details::make_http_client(base_uri, make_registration_client_config(model.settings, load_ca_certificates, bearer_token, gate));
}
events = web::json::value::array();
node_behaviour_grain_guard guard(resources, grain, events);
most_recent_update = grain->updated;
while (0 != events.size())
{
if (shutdown || registration_service_error || node_registered) break;
const auto id_type = get_resource_event_resource(node_behaviour_topic, events.at(0));
const auto event_type = get_resource_event_type(events.at(0));
// discard events prior to the node 'added' event (shouldn't generally be necessary?)
if (!(nmos::types::node == id_type.second && resource_added_event == event_type))
{
events.erase(0);
continue;
}
self_id = id_type.first;
slog::log<slog::severities::info>(gate, SLOG_FLF) << "Registering nmos-cpp node with the Registration API at: " << registration_client->base_uri().to_string();
auto token = cancellation_source.get_token();
request = details::request_registration(*registration_client, events.at(0), gate, token).then([&](pplx::task<void> finally)
{
auto lock = model.write_lock(); // in order to update local state
try
{
finally.get();
// on success (or an ignored failure), discard the resource event
if (0 != events.size())
{
events.erase(0);
}
// subsequent events are handled in registered operation
node_registered = true;
}
catch (const web::http::http_exception& e)
{
slog::log<slog::severities::error>(gate, SLOG_FLF) << "Registration request HTTP error: " << e.what() << " [" << e.error_code() << "]";
registration_service_error = true;
}
catch (const registration_service_exception&)
{
registration_service_error = true;
}
});
// avoid race condition between condition.notify_all() and request.is_done()
request.then([&]
{
condition.notify_all();
});
// wait for the request because interactions with the Registration API /resource endpoint must be sequential
condition.wait(lock, [&]{ return shutdown || registration_service_error || node_registered || request.is_done(); });
}
}
cancellation_source.cancel();
// wait without the lock since it is also used by the background tasks
details::reverse_lock_guard<nmos::write_lock> unlock{ lock };
request.wait();
}
void registered_operation(const nmos::id& self_id, nmos::model& model, const nmos::id& grain_id, load_ca_certificates_handler load_ca_certificates, registration_handler registration_changed, nmos::experimental::get_authorization_bearer_token_handler get_authorization_bearer_token, slog::base_gate& gate)
{
slog::log<slog::severities::info>(gate, SLOG_FLF) << "Adopting registered operation";
auto lock = model.write_lock();
auto& condition = model.condition;
auto& shutdown = model.shutdown;
auto& resources = model.node_resources;
const auto grain = nmos::find_resource(resources, { grain_id, nmos::types::grain });
if (resources.end() == grain) return;
std::unique_ptr<web::http::client::http_client> registration_client;
std::unique_ptr<web::http::client::http_client> heartbeat_client;
bool registration_service_error(false);
bool node_registered(false);
bool node_unregistered(false);
web::json::value events;
std::chrono::steady_clock::time_point heartbeat_time;
web::http::oauth2::experimental::oauth2_token registration_bearer_token;
web::http::oauth2::experimental::oauth2_token heartbeat_bearer_token;
// background tasks may read/write the above local state by reference
pplx::cancellation_token_source cancellation_source;
pplx::task<void> request = pplx::task_from_result();
pplx::task<void> heartbeats = pplx::task_from_result();
// "7. The Node registers its other resources (from /devices, /sources etc) with the Registration API."
tai most_recent_update{};
for (;;)
{
// wait for the thread to be interrupted because there are resource events (or this is the first time through)
// or because the node was unregistered (cleanly, or as a result of missed heartbeats)
// or because an error has been encountered with the selected registration service
// or because the server is being shut down
condition.wait(lock, [&]{ return shutdown || registration_service_error || node_unregistered || most_recent_update < grain->updated; });
if (registration_service_error)
{
pop_registration_service(model.settings);
model.notify();
registration_service_error = false;
cancellation_source.cancel();
// wait without the lock since it is also used by the background tasks
details::reverse_lock_guard<nmos::write_lock> unlock{ lock };
request.wait();
heartbeats.wait();
registration_client.reset();
heartbeat_client.reset();
cancellation_source = pplx::cancellation_token_source();
}
if (shutdown || empty_registration_services(model.settings) || node_unregistered) break;
// "The Node selects a Registration API to use based on the priority"
if (!registration_client)
{
const auto base_uri = top_registration_service(model.settings);
// if Registration API version is different, force re-registration
const auto registry_version = parse_api_version(web::uri::split_path(base_uri.path()).back());
if (registry_version != grain->version) break;
const auto bearer_token = get_authorization_bearer_token ? get_authorization_bearer_token() : web::http::oauth2::experimental::oauth2_token{};
registration_client = nmos::details::make_http_client(base_uri, make_registration_client_config(model.settings, load_ca_certificates, bearer_token, gate));
heartbeat_client = nmos::details::make_http_client(base_uri, make_heartbeat_client_config(model.settings, load_ca_certificates, bearer_token, gate));
// "The first interaction with a new Registration API [after a server side or connectivity issue]
// should be a heartbeat to confirm whether whether the Node is still present in the registry"
slog::log<slog::severities::info>(gate, SLOG_FLF) << "Attempting registration heartbeats with the Registration API at: " << registration_client->base_uri().to_string();
node_registered = false;
const std::chrono::seconds heartbeat_interval(nmos::fields::registration_heartbeat_interval(model.settings));
auto token = cancellation_source.get_token();
heartbeat_time = std::chrono::steady_clock::now();
heartbeats = update_node_health(*heartbeat_client, self_id, gate, token).then([&](bool success)
{
auto lock = model.write_lock(); // in order to update local state
node_registered = success;
if (node_registered)
{
// Synchronous notification that registered operation has been adopted successfully
if (registration_changed)
{
// this callback should not throw exceptions
registration_changed(heartbeat_client->base_uri());
}
}
else
{
node_unregistered = true;
}
model.notify();
}).then([=, &model, &heartbeat_time, &heartbeat_client, &heartbeat_bearer_token, &gate]
{
// "6. The Node persists itself in the registry by issuing heartbeats."
return pplx::do_while([=, &model, &heartbeat_time, &heartbeat_client, &heartbeat_bearer_token, &gate]
{
return pplx::complete_at(heartbeat_time + heartbeat_interval, token).then([=, &model, &heartbeat_time, &heartbeat_client, &heartbeat_bearer_token, &gate]() mutable
{
heartbeat_time = std::chrono::steady_clock::now();
// renew heartbeat_client if bearer token has changed
if (get_authorization_bearer_token)
{
const auto& bearer_token = get_authorization_bearer_token();
if (heartbeat_bearer_token.access_token() != bearer_token.access_token())
{
slog::log<slog::severities::more_info>(gate, SLOG_FLF) << "Update heartbeat client with new authorization token";
heartbeat_bearer_token = bearer_token;
heartbeat_client = nmos::details::make_http_client(base_uri, make_heartbeat_client_config(model.settings, load_ca_certificates, bearer_token, gate));
}
}
return update_node_health(*heartbeat_client, self_id, gate, token);
});
}, token);
}).then([&](pplx::task<void> finally)
{
auto lock = model.write_lock(); // in order to update local state
try
{
finally.get();
node_unregistered = true;
}
catch (const web::http::http_exception& e)
{
slog::log<slog::severities::error>(gate, SLOG_FLF) << "Registration heartbeat HTTP error: " << e.what() << " [" << e.error_code() << "]";
registration_service_error = true;
}
catch (const registration_service_exception&)
{
registration_service_error = true;
}
model.notify();
});
// wait for the response from the first heartbeat that the Node is still registered (or not!)
condition.wait(lock, [&]{ return shutdown || registration_service_error || node_unregistered || node_registered; });
if (shutdown || registration_service_error || node_unregistered) continue;
}
events = web::json::value::array();
node_behaviour_grain_guard guard(resources, grain, events);
most_recent_update = grain->updated;
while (0 != events.size())