Skip to content

Karabo Bound API Reference

The PythonDevice class is the basis for all karabo.bound devices

Devices implemented in the karabo.bound API should derive from this class. It provides an interface to the distributed system and holds properties that represent the state of a device. Commands may be exposed through it using so-called slots.

Devices run in a separate process, but internally use an event loop with multiple threads to serve requests from the distributed system.

Source code in src/pythonKarabo/karabo/bound/device.py
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
@KARABO_CONFIGURATION_BASE_CLASS
@KARABO_CLASSINFO("PythonDevice", karaboVersion)
class PythonDevice:
    """The PythonDevice class is the basis for all karabo.bound devices

    Devices implemented in the karabo.bound API should derive from this
    class. It provides an interface to the distributed system and holds
    properties that represent the state of a device. Commands may be exposed
    through it using so-called slots.

    Devices run in a separate process, but internally use an event loop with
    multiple threads to serve requests from the distributed system.
    """

    instanceCountPerDeviceServer = dict()
    instanceCountLock = threading.Lock()
    connectionParams = Hash(Broker.brokerTypeFromEnv(), Hash())
    timeServerId = None
    _loggerCfg = None

    @staticmethod
    def expectedParameters(expected):
        (
            STRING_ELEMENT(expected).key("_deviceId_")
            .displayedName("_DeviceID_")
            .description("Do not set this property, it will be set by the"
                         " device-server")
            .expertAccess().assignmentInternal().noDefaultValue().init()
            .commit(),

            STRING_ELEMENT(expected).key("deviceId")
            .displayedName("DeviceID")
            .description("The device instance ID uniquely identifies a device"
                         " instance in the distributed system")
            .readOnly()
            .commit(),

            INT32_ELEMENT(expected).key("heartbeatInterval")
            .displayedName("Heartbeat interval")
            .description("The heartbeat interval")
            .assignmentOptional()
            .defaultValue(20)  # smaller than C++ device: own process!
            .minInc(10)  # avoid too much traffic
            .expertAccess()
            .commit(),

            STRING_ELEMENT(expected).key("_serverId_")
            .displayedName("_ServerID_")
            .description("Do not set this property, it will be set by the"
                         " device-server")
            .expertAccess().assignmentInternal().noDefaultValue().init()
            .commit(),

            STRING_ELEMENT(expected).key("classId")
            .displayedName("ClassID")
            .description("The (factory)-name of the class of this device")
            .readOnly().initialValue(PythonDevice.__classid__)
            .commit(),

            STRING_ELEMENT(expected).key("classVersion")
            .displayedName("Class version")
            .description("The version of the class of this device defined in"
                         " KARABO_CLASSINFO, prepended by package name")
            .expertAccess()
            # No version dependent initial value: It would make the static
            # schema version dependent, i.e. introduce fake changes.
            .readOnly()
            .commit(),

            STRING_ELEMENT(expected).key("karaboVersion")
            .displayedName("Karabo version")
            .description("The version of the Karabo framework running this "
                         "device")
            .expertAccess()
            .readOnly()
            # No version dependent initial value, see above at "classVersion".
            .commit(),

            STRING_ELEMENT(expected).key("serverId")
            .displayedName("ServerID")
            .description("The device-server on which this device "
                         "is running on")
            .expertAccess()
            .readOnly()
            .commit(),

            STRING_ELEMENT(expected).key("hostName")
            .displayedName("Host")
            .description("Do not set this property, it will be set by the"
                         " device-server.")
            .expertAccess()
            .assignmentInternal().noDefaultValue()
            .init()
            .commit(),

            INT32_ELEMENT(expected).key("pid")
            .displayedName("Process ID")
            .description("The unix process ID of the device")
            .expertAccess()
            .readOnly().initialValue(0)
            .commit(),

            STATE_ELEMENT(expected).key("state")
            .displayedName("State")
            .description("The current state the device is in")
            .initialValue(State.UNKNOWN)
            .commit(),

            STRING_ELEMENT(expected).key("status")
            .displayedName("Status")
            .description("A more detailed status description")
            .readOnly().initialValue("")
            .commit(),

            ALARM_ELEMENT(expected).key("alarmCondition")
            .displayedName("Alarm condition")
            .description("The current alarm condition of the device. Evaluates"
                         " to the highest condition on any property if not set"
                         " manually.")
            .initialValue(AlarmCondition.NONE)
            .commit(),

            STRING_ELEMENT(expected).key("lockedBy")
            .displayedName("Locked by")
            .reconfigurable()
            .assignmentOptional().defaultValue("")
            .setSpecialDisplayType("lockedBy")
            .commit(),

            SLOT_ELEMENT(expected).key("slotClearLock")
            .displayedName("Clear Lock")
            .expertAccess()
            .commit(),

            STRING_ELEMENT(expected).key("lastCommand")
            .displayedName("Last command")
            .description("The last slot called.")
            .expertAccess()
            .readOnly().initialValue("")
            .commit(),

            NODE_ELEMENT(expected).key("performanceStatistics")
            .displayedName("Performance Statistics")
            .description("Accumulates some statistics")
            .expertAccess()
            .commit(),

            BOOL_ELEMENT(expected).key("performanceStatistics."
                                       "messagingProblems")
            .displayedName("Messaging problems")
            .description("If true, there is a problem consuming"
                         " broker messages")
            .expertAccess()
            .readOnly().initialValue(False)
            .commit(),

            BOOL_ELEMENT(expected).key("performanceStatistics.enable")
            .displayedName("Enable Performance Indicators")
            .description("Enables some statistics to follow the"
                         " performance of an individual device")
            .reconfigurable()
            .expertAccess()
            .assignmentOptional().defaultValue(False)
            .commit(),

            FLOAT_ELEMENT(expected).key("performanceStatistics."
                                        "processingLatency")
            .displayedName("Processing latency")
            .description("Average time interval between remote message"
                         " sending and processing it in this device.")
            .unit(Unit.SECOND).metricPrefix(MetricPrefix.MILLI)
            .expertAccess()
            .readOnly().initialValue(0.0)
            .commit(),

            UINT32_ELEMENT(expected).key("performanceStatistics"
                                         ".maxProcessingLatency")
            .displayedName("Maximum latency")
            .description("Maximum processing latency within averaging"
                         " interval.")
            .unit(Unit.SECOND).metricPrefix(MetricPrefix.MILLI)
            .expertAccess()
            .readOnly().initialValue(0)
            .commit(),

            UINT32_ELEMENT(expected).key("performanceStatistics.numMessages")
            .displayedName("Number of messages")
            .description("Number of messages received within"
                         " averaging interval.")
            .unit(Unit.COUNT)
            .expertAccess()
            .readOnly().initialValue(0)
            .commit(),

            UINT32_ELEMENT(expected).key("performanceStatistics"
                                         ".maxEventLoopLatency")
            .displayedName("Max. event loop latency")
            .description("Maximum time interval between posting a message on"
                         " the central event loop and processing it within"
                         " averaging interval.")
            .unit(Unit.SECOND).metricPrefix(MetricPrefix.MILLI)
            .expertAccess()
            .readOnly().initialValue(0)
            .commit(),

            # Logging config:
            # Expose only the non-appender specific part (only 'priority' now).
            # Would like to use NODE_ELEMENT(..)...appendParametersOf(Logger)
            # and then remove again "ostream", "file" and "cache" from
            # expected["Logger"], but Schema.getParameterHash() returns a copy.
            NODE_ELEMENT(expected).key("Logger")
            .description("Logging settings")
            .displayedName("Logger")
            .expertAccess()
            .commit(),

            # Keep in sync with 'priority' in C++ Logger::expectedParameters
            STRING_ELEMENT(expected).key("Logger.priority")
            .displayedName("Priority")
            .description("The default log priority")
            .options("DEBUG INFO WARN ERROR FATAL")
            .assignmentOptional().defaultValue("INFO")
            .commit(),
        )

    log = None  # make always available, at least as None

    def __init__(self, configuration: Hash | None):
        """The initialization method of a device

        Expects a configuration passed as a Karabo Hash.

        :param configuration: the configuration Hash. It may contain the
               following entries:

               - _serverId_: there id of the hosting server
               - _deviceId_: a usually autogenerated device id
        """
        if configuration is None:
            raise ValueError("Configuration must be Hash object, not None")
        super().__init__()

        self._parameters = configuration
        if "_serverId_" in self._parameters:
            self.serverid = self._parameters["_serverId_"]
        else:
            self.serverid = "__none__"

        if "_deviceId_" in self._parameters:
            self.deviceid = self._parameters["_deviceId_"]
        else:
            self.deviceid = "__none__"  # TODO: generate uuid

        # Initialize threading locks...
        self._stateChangeLock = threading.Lock()
        self._stateDependentSchema = {}
        self._injectedSchema = Schema()

        # Initialize _client to None (important!)
        self._client = None

        # Initialize hostName
        if "hostName" not in self._parameters:
            self._parameters["hostName"] = (
                socket.gethostname().partition('.')[0])
        self.hostname = self._parameters["hostName"]

        # timeserver related
        self._timeLock = threading.Lock()
        self._timeId = 0
        self._timeSec = 0
        self._timeFrac = 0
        self._timePeriod = 0

        # Setup the validation classes
        rules = ValidatorValidationRules()
        rules.allowAdditionalKeys = False
        rules.allowMissingKeys = True
        rules.allowUnrootedConfiguration = True
        rules.injectDefaults = False
        rules.injectTimestamps = True
        rules.forceInjectedTimestamp = False  # allows to specify case-by-case
        # Internal validator for set(..)
        self.validatorIntern = Validator(rules)
        rules.forceInjectedTimestamp = True
        # External validator for slotReconfigure(..)
        self.validatorExtern = Validator(rules)

        # For broker error handler
        self.lastBrokerErrorStamp = 0

        self.initClassId()
        self.initSchema()

        with self._stateChangeLock:
            self._parameters.set("classId", self.classid)
            # class version is the (base) module name plus __version__ where
            # the latter comes from KARABO_CLASSINFO decorator and should be
            # the repository version
            clsVers = f"{self.__module__.split('.', 1)[0]}-{self.__version__}"
            self._parameters.set("classVersion", clsVers)
            self._parameters.set("karaboVersion", karaboVersion)
            self._parameters.set("deviceId", self.deviceid)
            self._parameters.set("serverId", self.serverid)
            self._parameters.set("pid", os.getpid())

            # Validate first time to assign timestamps
            # Note that invalid property keys are already caught via
            # Configurator(PythonDevice).create in launchPythonDevice below.
            result, error, validated = self.validatorIntern.validate(
                self._fullSchema, self._parameters,
                self.getActualTimestamp())
            if not result:
                raise RuntimeError(error)
            self._parameters.merge(validated,
                                   HashMergePolicy.REPLACE_ATTRIBUTES)

        # Create 'info' hash
        info = Hash("type", "device")
        info["classId"] = self.classid
        info["serverId"] = self.serverid
        info["host"] = self.hostname
        currentState = self["state"]
        if currentState is State.ERROR:
            status = "error"
        elif currentState is State.UNKNOWN:
            status = "unknown"
        else:
            status = "ok"
        info["status"] = status

        # device capabilities are encoded in a bit mask field
        capabilities = 0
        if configuration.has("availableScenes"):
            capabilities |= Capabilities.PROVIDES_SCENES
        if configuration.has("availableMacros"):
            capabilities |= Capabilities.PROVIDES_MACROS
        if configuration.has("interfaces"):
            capabilities |= Capabilities.PROVIDES_INTERFACES

        info["capabilities"] = capabilities

        interfaces = 0
        if configuration.has("interfaces"):
            for description in self.get("interfaces"):
                if description in Interfaces.__members__:
                    interfaces |= Interfaces[description]
                else:
                    raise NotImplementedError(
                        "Provided interface is not supported: {}".format(
                            description))

            info["interfaces"] = interfaces

        # Setup device logger (needs self._parameters) before SignalSlotable
        # to log e.g. broker setup (i.e. logging must not log to broker).
        self.loadLogger()
        self.log = Logger.getCategory(self.deviceid)

        # Instantiate SignalSlotable object
        self._sigslot = SignalSlotable(
            self.deviceid, PythonDevice.connectionParams,
            self._parameters["heartbeatInterval"], info)

        # Initialize Device slots and instantiate all channels
        self._initDeviceSlots()
        self._inputChannelHandlers = {}  # for re-injected InputChannel

        self.initChannels()

        # Register guard for slot calls
        self._sigslot.registerSlotCallGuardHandler(self.slotCallGuard)

        # Register updateLatencies handler
        self._sigslot.registerPerformanceStatisticsHandler(
            self.updateLatencies)

        self._sigslot.registerBrokerErrorHandler(self.onBrokerError)

        # Initial functions one can register
        self._func = []

    def registerInitialFunction(self, func):
        self._func.append(func)

    def startInitialFunctions(self):
        """Start initial functions: second constructors(?)"""
        # call initial function registered in the device constructor
        # in registration's order
        for f in self._func:
            f()

    def _finalizeInternalInitialization(self):
        # Start - after all settings/registrations done:
        # Communication (incl. system registration) starts and thus parallelism
        # This is done here and not yet in __init__ to be sure that inheriting
        # devices can register in their __init__ after super(..).__init__(..)
        self._sigslot.start()  # Can raise e.g. for invalid instanceId

        pid = self["pid"]
        self.log.INFO("'{0.classid}' with deviceId '{0.deviceid}' got started "
                      "on server '{0.serverid}', pid '{1}'.".format(self, pid))

        # Inform server that we are up - fire-and-forget is sufficient.
        self._sigslot.call(
            self.serverid, "slotDeviceUp", self.deviceid, True, "success")

        # Trigger connection of input channels
        self._sigslot.connectInputChannels()

        # An attempt to start initial functions a.k.a second constructor if any
        # via event loop. If an exception occurs, we will kill the device
        def wrapInitialFunctions():
            try:
                self.startInitialFunctions()
            except Exception as e:
                msg = f"{repr(e)} in initialisation"
                self.log.ERROR(msg)
                self.set("status", msg)
                self._sigslot.call("", "slotKillDevice")

        # As long as the below connect(..) to the time server is not turned
        # into a non-blocking asyncConnect(..), we better add a thread here:
        # Otherwise, we can get into trouble if the init function (registered
        # by the device coder and called from self.startInitialFunctions()
        # above) also does some synchronous operation (e.g. another
        # connect(..)):
        # Both operations block at the same time and we have only two threads
        # (main one and one from our SignalSlotable), so the blocking cannot
        # be resolved and thus both actions time out.
        EventLoop.addThread()
        EventLoop.post(wrapInitialFunctions)

        if self.timeServerId:
            self.log.DEBUG("Connecting to time server : \"{}\""
                           .format(self.timeServerId))
            # TODO 2: Better use asyncConnect!
            self._sigslot.connect(self.timeServerId, "signalTimeTick",
                                  "slotTimeTick")

    @property
    def signalSlotable(self):
        """Get SignalSlotable object embedded in PythonDevice instance."""
        return self._sigslot

    def loadLogger(self):
        """Load the distributed logger

        Uses config in self._parameters["Logger"] and PythonDevice._loggerCfg
        """
        # Take cfg as passed from server and merge device specific settings
        if PythonDevice._loggerCfg is None:  # for now if started from MDL
            config = self._parameters["Logger"]
        else:
            config = copy.copy(PythonDevice._loggerCfg)
            config.merge(self._parameters["Logger"])

        # Cure the file name of file logger: own dir inside server's log dir:
        if 'file.filename' in config:
            serverLogDir = os.path.dirname(config['file.filename'])
            path = os.path.join(serverLogDir, self.deviceid)
        else:  # Again, for now if started from MDL
            path = os.path.join(os.environ["KARABO"], "var", "log",
                                self.serverid, self.deviceid)
        if not os.path.isdir(path):
            os.makedirs(path)
        path = os.path.join(path, 'device.log')
        config.set('file.filename', path)

        # finally configure the logger
        Logger.configure(config)
        Logger.useOstream()
        Logger.useFile()
        Logger.useCache()

    def remote(self):
        """Return a DeviceClient instance.

        The DeviceClient will use this device's SignalSlotable to interact with
        the distributed system
        :return:
        """
        if self._client is None:
            # SignalSlotable object for reuse
            self._client = DeviceClient(self._sigslot)
        return self._client

    def set(self, *args, **kwargs):
        """Updates device properties and notifies any observers.
        Note that an update of the device "state" property must be done using
        updateState(..).

        args: can be of length

            * one: expects a Hash, and uses current timestamp
            * two: expects a key, value pair and uses current timestamp or a
                   Hash, timestamp pair
            * three: expects key, value and timestamp

        kwargs: validate: specifies if validation of args should be performed
                before notification. Skipping validation should not be used
                with State or AlarmCondition.

        If a Hash is provided, its keys should be device properties and the
        values should have the proper types. A State or AlarmCondition inside
        a Hash should be given as a string.
        """
        with self._stateChangeLock:
            self._setNoStateLock(*args, **kwargs)

    def _setNoStateLock(self, *args, **kwargs):
        """"
        Internal helper like set, but requires 'with self._stateChangeLock:'
        """

        pars = tuple(args)
        validate = kwargs.get("validate", True)

        if len(pars) == 0 or len(pars) > 3:
            raise SyntaxError("Number of parameters is wrong: "
                              "from 1 to 3 arguments are allowed.")

        # key, value, timestamp args
        if len(pars) == 3:
            key, value, stamp = pars

            if not isinstance(stamp, Timestamp):
                raise TypeError("The 3rd argument should be Timestamp")

            h = Hash()
            # State and AlarmCondition are set as strings - the validator will
            # add necessary "indicateState"/"indicateAlarm" attributes.
            # (Will not work if validate=False!)
            if isinstance(value, State):
                h.set(key, value.name)
            elif isinstance(value, AlarmCondition):
                h.set(key, value.asString())
            else:
                h.set(key, value)
            pars = tuple([h, stamp])

        # hash args
        if len(pars) == 1:
            h = pars[0]
            if not isinstance(h, Hash):
                raise TypeError("The only argument should be a Hash")
            pars = tuple([h, self.getActualTimestamp()])  # add timestamp

        # key, value or hash, timestamp args
        if len(pars) == 2:
            if not isinstance(pars[0], Hash):
                key, value = pars

                h = Hash()
                # See comment above about State and AlarmCondition
                if isinstance(value, State):
                    h.set(key, value.name)
                elif isinstance(value, AlarmCondition):
                    h.set(key, value.asString())
                else:
                    h.set(key, value)
                pars = tuple([h, self.getActualTimestamp()])

            hash, stamp = pars

            validated = None

            if validate:
                result, error, validated = self.validatorIntern.validate(
                    self._fullSchema, hash, stamp)
                if not result:
                    raise RuntimeError("Bad parameter setting attempted, "
                                       "ignore keys {}. Validation "
                                       "reports: {}".format(hash.keys(),
                                                            error))
            else:
                validated = hash
                # Add timestamps
                for path in validated.getPaths():
                    node = validated.getNode(path)
                    attributes = node.getAttributes()
                    stamp.toHashAttributes(attributes)

            if not validated.empty():
                self._parameters.merge(
                    validated, HashMergePolicy.REPLACE_ATTRIBUTES)

                self._sigslot.emit("signalChanged", validated, self.deviceid)

    def setVectorUpdate(self, key, updates, updateType, timestamp=None):
        """Concurrency safe update of vector property (not for tables)

        :param key: key of the vector property to update
        :param updates: iterable of items to remove from property vector
                       (starting at the front) or to add (at the end)
        :param updateType: indicates update type, applied individually to all
                           items in 'updates',
                           one of "add", "addIfNotIn", "removeOne", "removeAll"
        :param timestamp: optional timestamp to assign to updated vector
                          property, defaults to self.getActualTimestamp()
        """
        if timestamp is None:
            timestamp = self.getActualTimestamp()

        with self._stateChangeLock:
            # vec is a copy, so we are safe if _setNoStateLock raises
            vec = self._parameters.get(key)
            if updateType == "add":
                vec.extend(updates)
            else:
                for update in updates:
                    if updateType == "addIfNotIn":
                        if update not in vec:
                            vec.append(update)
                    elif updateType == "removeOne":
                        if update in vec:
                            vec.remove(update)
                    elif updateType == "removeAll":
                        for _ in range(vec.count(update)):
                            vec.remove(update)
                    else:
                        raise ValueError(f"Unknown updateType '{updateType}'")
            # Finally update the property
            self._setNoStateLock(key, vec, timestamp)

    def slotLoggerContent(self, info):
        """Slot call to receive logger content from the print logger

        This slot is similar to `slotLoggerContent` for servers except that
        the `serverId` key is substituted with key `deviceId`.

        look in the device_server module for detailed informations
        """
        nMessages = info.get("logs", default=KARABO_LOGGER_CONTENT_DEFAULT)
        content = Logger.getCachedContent(nMessages)
        self._sigslot.reply(
            Hash("deviceId", self.deviceid, "content", content))

    def __setitem__(self, key, value):
        """Alternative to `self.set`: `self[key] = value`

        The timestamp is set to the current timestamp
        """
        self.set(key, value, self.getActualTimestamp())

    def writeChannel(self, channelName, data,
                     timestamp=None, safeNDArray=False):
        """Write data to an output channel.

        :param channelName: name given to an OUTPUT_CHANNEL in
                            expectedParameters
        :param data: a Hash with keys as described in the Schema of the
                     channel
        :param timestamp: optional timestamp; if none is given, the current
                          timestamp is used
        :param safeNDArray: Boolean that should be set to 'True' if 'data'
                            contains any 'NDArray' and their data is not
                            changed after this 'writeChannel'. Otherwise,
                            data will be copied if needed, i.e. when the output
                            channel has to queue or serves inner-process
                            receivers.

        Example for an output channel sending an image (key: "image") and
        a frame number (key: "frame"):

        imgArray = numpy.array(...)
        self.writeChannel("output", Hash("image", ImageData(imgArray),
                                         "frame", frameNumber))
        Note:
        The methods 'writeChannel(..)' and 'signalEndOfStream(..)'
        must not be called concurrently.
        """

        channel = self._sigslot.getOutputChannel(channelName)
        sourceName = f"{self.getInstanceId()}:{channelName}"
        if not timestamp:
            timestamp = self.getActualTimestamp()
        meta = ChannelMetaData(sourceName, timestamp)
        channel.write(data, meta)
        channel.update(safeNDArray=safeNDArray)

    def signalEndOfStream(self, channelName):
        """Signal an end-of-stream event

        The channel is identified by `channnelName`

        Note:
        The methods 'writeChannel(..)' and 'signalEndOfStream(..)'
        must not be called concurrently.
        """
        self._sigslot.getOutputChannel(channelName).signalEndOfStream()

    def get(self, key):
        """Return a property of this device

        :param key: as defined in the expected parameter section
        :return: the value of the property
        """
        with self._stateChangeLock:
            try:
                result = self._parameters[key]
                if not self._fullSchema.hasClassId(key):
                    classId = None
                else:
                    classId = self._fullSchema.getClassId(key)
                if classId == KARABO_CLASS_ID_STATE:
                    return State(result)
                elif classId == KARABO_CLASS_ID_ALARM:
                    return AlarmCondition(result)
                elif isinstance(result, (Hash, VectorHash)):
                    # For Hash and VectorHash, 'result' is a reference, so if
                    # it would be returned and the returned object would be
                    # changed, self._parameters would be changed as well, pro-
                    # viding a back door without using self._stateChangeLock!
                    return copy.copy(result)
                else:
                    # Note that vectors of numbers are copies
                    return result
            except RuntimeError as e:
                print(e)
                raise AttributeError(
                    f"Error while retrieving '{key}' from device")

    def __getitem__(self, key):
        """Alternative for `value = self.get(key)`: `value = self[key]`"""
        return self.get(key)

    def getFullSchema(self):
        """Return the full schema describing this device

        :return: a karabo Schema object
        """
        # Have to copy to protect using it while updating
        s = Schema()
        with self._stateChangeLock:
            s.copy(self._fullSchema)
        return s

    def updateSchema(self, schema):
        """Updates the existing device schema
        It merges the schema in argument to the static schema defined in
        expectedParameters, removing any previous schema injections.

        If a property is being reinjected, and of the same type, then it will
        keep its current value. If it does not fit within range, an error will
        be raised.
        Likewise, if the type changes, and the value cannot be cast, an error
        will be raised.

        Input and output channels will be created if injected and removed again
        in case updateSchema is called again without them.
        An output channel is also recreated if its schema changes.
        Note that for newly created input channels there are no data, input
        and end-of-stream handlers registered. This has to be done
        via the corresponding self.KARABO_ON_[DATA|INPUT|EOS] methods.
        If an InputChannel is re-injected, its handlers are kept.

        :param schema: to be merged with the static schema
        """
        rules = ValidatorValidationRules()
        rules.allowAdditionalKeys = True
        rules.allowMissingKeys = True
        rules.allowUnrootedConfiguration = True
        rules.injectDefaults = True
        rules.injectTimestamps = True
        validator = Validator()
        validator.setValidationRules(rules)
        _, _, validated = validator.validate(schema, Hash(),
                                             self.getActualTimestamp())

        with self._stateChangeLock:
            for path in self._injectedSchema.getPaths():
                if not (self._staticSchema.has(path) or schema.has(path)):
                    self._parameters.erasePath(path)
                    # Now we might have removed 'n.m.l.c' completely although
                    # 'n.m' is in static schema - restore (empty) node 'n.m':
                    pathSplit = path.split('.')
                    for i in range(1, len(pathSplit)):
                        p = ".".join(pathSplit[0:-i])  # 'n.m.l', 'n.m', 'n'
                        if (self._staticSchema.has(p)
                                and not self._parameters.has(p)):
                            self._parameters[p] = Hash()
                            # 'n.m' added added back (after 'n.m.l' failed)
                            break

            self._stateDependentSchema.clear()

            prevFullSchemaLeaves = [p for p in self._fullSchema.getPaths()
                                    if not self._fullSchema.isNode(p)]

            # Erase previously present injected InputChannels
            for inChannel in self._sigslot.getInputChannelNames():
                if self._staticSchema.has(inChannel):
                    # Do not touch static one
                    # (even if re-injected to change properties).
                    continue
                if self._injectedSchema.has(inChannel):
                    self.log.INFO("updateSchema: Remove input channel '"
                                  f"{inChannel}'")
                    self._sigslot.removeInputChannel(inChannel)
                    if not schema.has(inChannel):
                        # not re-injected - clear handler back-up
                        del self._inputChannelHandlers[inChannel]
            # Treat injected OutputChannels
            outChannelsToRecreate = set()
            for outChannel in self._sigslot.getOutputChannelNames():
                if self._injectedSchema.has(outChannel):
                    if self._staticSchema.has(outChannel):
                        # Channel changes its schema back to its default
                        outChannelsToRecreate.add(outChannel)
                    else:
                        # Previously injected channel has to be removed
                        self.log.INFO("updateSchema: Remove output channel '"
                                      f"{outChannel}'")
                        self._sigslot.removeOutputChannel(outChannel)
                if (self._staticSchema.has(outChannel)
                    and schema.has(outChannel)
                    and (not schema.hasClassId(outChannel)
                         or schema.getClassId(outChannel)
                         != "OutputChannel"
                         )):
                    outChannelsToRecreate.add(outChannel)

            self._injectedSchema.copy(schema)
            self._fullSchema.copy(self._staticSchema)
            self._fullSchema += self._injectedSchema

            # notify the distributed system...
            self._sigslot.emit("signalSchemaUpdated",
                               self._fullSchema, self.deviceid)

            # Keep new leaves only. This hash is then set, to avoid re-sending
            # updates with the same value.
            for path in prevFullSchemaLeaves:
                validated.erasePath(path)

            self._setNoStateLock(validated)

            # Init any freshly injected channels
            self._initChannels(topLevel="", schema=self._injectedSchema)
            # ... and those with potential Schema change
            for outToCreate in outChannelsToRecreate:
                self.log.INFO("updateSchema triggers creation of output "
                              f"channel '{outToCreate}'")
                self._prepareOutputChannel(outToCreate)

        self.log.INFO("Schema updated")

    def appendSchema(self, schema):
        """Append to the existing device schema

        If a property is being reinjected, and of the same type, then it will
        keep its current value. If it does not fit within range, an error will
        be raised.
        Likewise, if the type changes, and the value cannot be cast, an error
        will be raised.

        Input and output channels will be created if injected.
        An output channel is also recreated if its schema changes, to make the
        other end aware.
        Note that for newly created input channels there are no data, input
        and end-of-stream handlers registered. This has to be done
        via the corresponding self.KARABO_ON_[DATA|INPUT|EOS] methods.
        If an InputChannel is re-injected, its handlers are kept.

        :param schema: to append to current full schema
        """
        rules = ValidatorValidationRules()
        rules.allowAdditionalKeys = True
        rules.allowMissingKeys = True
        rules.allowUnrootedConfiguration = True
        rules.injectDefaults = True
        rules.injectTimestamps = True
        validator = Validator()
        validator.setValidationRules(rules)
        _, _, validated = validator.validate(schema, Hash(),
                                             self.getActualTimestamp())

        with self._stateChangeLock:
            # Take care of OutputChannels schema changes
            outChannelsToRecreate = set()
            for path in self._sigslot.getOutputChannelNames():
                if (self._fullSchema.has(path) and schema.has(path)
                    and (not schema.hasClassId(path) or
                         schema.getClassId(path) != "OutputChannel")):
                    # maybe output schema change without using OUTPUT_CHANNEL
                    outChannelsToRecreate.add(path)
                # elif schema.getClassId(path) == "OutputChannel":
                #      will be recreated by _initChannels(schema) below

            self._stateDependentSchema = {}
            self._injectedSchema += schema

            prevFullSchemaLeaves = [p for p in self._fullSchema.getPaths()
                                    if not self._fullSchema.isNode(p)]
            self._fullSchema.copy(self._staticSchema)
            self._fullSchema += self._injectedSchema

            # notify the distributed system...
            self._sigslot.emit("signalSchemaUpdated", self._fullSchema,
                               self.deviceid)

            # Keep new leaves only. This hash is then set, to avoid re-sending
            # updates with the same value.
            for path in prevFullSchemaLeaves:
                validated.erasePath(path)
            self._setNoStateLock(validated)

            # Init any freshly injected channels
            self._initChannels(topLevel="", schema=schema)
            # ... and those output channels with potential Schema change
            for outToCreate in outChannelsToRecreate:
                self.log.INFO("appendSchema triggers creation of output "
                              f"channel '{outToCreate}'")
                self._prepareOutputChannel(outToCreate)

        self.log.INFO("Schema appended")

    def appendSchemaMaxSize(self, path, value, emitFlag=True):
        """
        Append Schema to change/set maximum size information for path.
        If paths does not exist, raise KeyError.

        This is similar to the more general appendSchema, but dedicated to a
        common use case.

        :param path  indicates the parameter which should be a
                     Vector- or TableElement
        :param value is the new maximum size of the element
        :param emitFlag indicates if others should be informed about this
                        Schema update. If this method is called for a bunch of
                        paths, it is recommended to set this to True only for
                        the last call.
        """
        with self._stateChangeLock:
            if not self._fullSchema.has(path):
                raise KeyError("Path '{}' not found in the device schema."
                               .format(path))

            self._stateDependentSchema = {}
            # Do not touch static schema - that must be restorable via
            # updateSchema(Schema())
            # OVERWRITE_ELEMENT checks whether max size attribute makes sense
            # for path
            (OVERWRITE_ELEMENT(self._fullSchema).key(path)
             .setNewMaxSize(value).commit(),)
            if self._injectedSchema.has(path):
                (OVERWRITE_ELEMENT(self._injectedSchema).key(path)
                 .setNewMaxSize(value).commit(),)

            if emitFlag:
                self._sigslot.emit("signalSchemaUpdated",
                                   self._fullSchema, self.deviceid)

    def getAliasFromKey(self, key, aliasReferenceType):
        """
        Return the alias of a key
        :param key: to return the alias from
        :param aliasReferenceType: type the alias is of
        :return: an object of aliasReferenceType
        """
        try:
            with self._stateChangeLock:
                return self._fullSchema.getAliasFromKey(key,
                                                        aliasReferenceType)
        except RuntimeError as e:
            raise AttributeError("Error while retrieving alias from parameter"
                                 " ({}): {}".format(key, e))

    def getKeyFromAlias(self, alias):
        """Return the key mapping to a given alias"""
        try:
            with self._stateChangeLock:
                return self._fullSchema.getKeyFromAlias(alias)
        except RuntimeError as e:
            raise AttributeError("Error while retrieving parameter from alias"
                                 " ({}): {}".format(alias, e))

    def aliasHasKey(self, alias):
        """Check if a key for a given alias exists"""
        with self._stateChangeLock:
            return self._fullSchema.aliasHasKey(alias)

    def keyHasAlias(self, key):
        """Check if a given key has an alias defined"""
        with self._stateChangeLock:
            return self._fullSchema.keyHasAlias(key)

    def getValueType(self, key):
        """Get the ValueType of a given key

        :returns: The type in terms of `karabo::util::ReferenceTypes`
        """
        with self._stateChangeLock:
            return self._fullSchema.getValueType(key)

    def getCurrentConfiguration(self, tags=""):
        """Return the current configuration, optionally filtered by tags

        :param tags: a string, with several entries separated by commas
                    spaces or semicolons. Set to an empty string if no
                    filtering is to be applied.
        :return: a configuration Hash
        """
        with self._stateChangeLock:
            if tags == "":
                # Outside the state change lock we need a copy:
                return copy.copy(self._parameters)
            else:
                return HashFilter.byTag(self._fullSchema, self._parameters,
                                        tags, " ,;")

    def getCurrentConfigurationSlice(self, paths):
        """Retrieves a slice of the current configuration.

        :param paths: of the configuration which should be returned
                      (as declared in expectedParameters,
                       method throws if a non-existing path is given)
        :return: Hash with the current values and attributes (e.g. timestamp)
                 of the selected configuration
        """
        result = Hash()
        with self._stateChangeLock:
            for p in paths:
                node = self._parameters.getNode(p)
                # with normal set, type deduction for empty vector may fail
                result.setAs(p, node.getValue(), node.getType())
                newNode = result.getNode(p)
                newNode.setAttributes(node.getAttributes())

        return result

    def filterByTags(self, configuration, tags):
        """Filter a given configuration Hash by tags

        :param configuration:
        :param tags: a string, with several entries separated by commas
                     spaces or semicolons
        :return: the filtered configuration Hash
        """
        with self._stateChangeLock:
            return HashFilter.byTag(self._fullSchema, configuration,
                                    tags, " ,;")

    def getServerId(self):
        """Return the id of the server hosting this devices"""

        return self.serverid

    def getAvailableInstances(self):
        """Return available instances in the distributed system"""

        return self._sigslot.getAvailableInstances()

    def preReconfigure(self, incomingReconfiguration):
        """
        Use this hook to alter a configuration Hash before it gets applied to
        the device and the distributed system is notified of the change.
        :param incomingReconfiguration:
        """

    def postReconfigure(self):
        """
        Use this hook to react on configuration changes after they have been
        validated and applied to the device, and have been notified to the
        distributed system.
        """

    def preDestruction(self):
        """
        Use this hook if you need to perform clean-up actions before a device
        gets destroyed.
        """

    def initClassId(self):
        self.classid = self.__class__.__classid__

    def initSchema(self):
        self._staticSchema = PythonDevice.getSchema(self.classid)
        self._fullSchema = Schema(self.classid)
        self._fullSchema.copy(self._staticSchema)

    def updateState(self, newState, propertyUpdates=None, timestamp=None):
        """Update the state property of the device to a new state.

        :param newState: the state to set the device to
        :propertyUpdates: a Hash with further properties to update (or None)
        :timestamp: timestamp to be assigned to the update,
                    if None, use self.getActualTimestamp()
        :return:
        """
        assert isinstance(newState, State)
        stateName = newState.name
        self.log.DEBUG(f"updateState: {stateName}")
        if propertyUpdates is None:
            propertyUpdates = Hash()
        if timestamp is None:
            timestamp = self.getActualTimestamp()

        newStatus = None
        with self._stateChangeLock:
            if self._parameters["state"] != stateName:
                propertyUpdates.set("state", stateName)
                # Validator adds "indicateState" attribute
                if newState is State.ERROR:
                    newStatus = "error"
                elif newState is State.UNKNOWN:
                    newStatus = "unknown"
                else:
                    statuses = ("error", "unknown")
                    if self._sigslot.getInstanceInfo()["status"] in statuses:
                        newStatus = "ok"

            if propertyUpdates:
                self._setNoStateLock(propertyUpdates, timestamp)

        # Send potential instanceInfo update without state change lock
        if newStatus:
            self._sigslot.updateInstanceInfo(Hash("status", newStatus))
        # place new state as default reply to interested event initiators
        self._sigslot.reply(stateName)

    def noStateTransition(self, currentState, currentEvent):
        """
        This function is called if a requested state transition is not allowed
        in the current context. Usually, this means you have an error in your
        state machine.
        """
        self.log.WARN("Device \"{}\" being in state '{}' does not allow the"
                      " transition for event '{}'."
                      .format(self.deviceid, currentState, currentEvent))

    def onTimeUpdate(self, id, sec, frac, period):
        """Called when an update from the time server is received

        :param id: train id
        :param sec: seconds
        :param frac: fractional seconds
        :param period:
        :return:
        """

    def KARABO_SLOT(self, slot, slotName=None, numArgs=None):
        """Register a slot in the distributed system.

        :param slot is the callable to register
        :param slotname is used to call the slot, 'None' means slot.__name__,
                        note that a '_' can also be called as '.' as needed
                        for nested slots in the Schema like 'node.slotInNode'
        :param numArgs number of arguments that the slot has,
                       'None' means to (try to) deduce from 'slot'

        Note that a slot is only connected with a SLOT_ELEMENT if the key of
        the SLOT_ELEMENT matches the slot name provided to this function.

            SLOT_ELEMENT(expected).key("slotDoSomething")

            ....

            self.KARABO_SLOT(slotDoSomething)

            .....

            def slotDoSomething(self):
                pass

        For slots under a node, the method name needs to replace '.' by '_'.

            SLOT_ELEMENT(expected).key("node.slotOther")

            ....

            self.KARABO_SLOT(node_slotOther)

            ....

            def node_slotOther(self):
                pass
        """

        if slotName is None:
            if numArgs is None:
                self._sigslot.registerSlot(slot)
            else:
                self._sigslot.registerSlot(slot, numArgs=numArgs)
        elif numArgs is None:
            self._sigslot.registerSlot(slot, slotName)
        else:
            self._sigslot.registerSlot(slot, slotName, numArgs)

    def _initDeviceSlots(self):
        # Register intrinsic signals
        # changeHash, instanceId
        self._sigslot.registerSignal("signalChanged", Hash, str)
        # schema, deviceid
        self._sigslot.registerSignal("signalSchemaUpdated", Schema, str)

        # Register intrinsic slots
        self._sigslot.registerSlot(self.slotReconfigure)
        self._sigslot.registerSlot(self.slotGetConfiguration)
        self._sigslot.registerSlot(self.slotGetConfigurationSlice)
        self._sigslot.registerSlot(self.slotGetSchema)
        self._sigslot.registerSlot(self.slotKillDevice)
        # Timeserver related slots
        self._sigslot.registerSlot(self.slotTimeTick)
        self._sigslot.registerSlot(self.slotGetTime)

        self._sigslot.registerSlot(self.slotLoggerPriority)
        self._sigslot.registerSlot(self.slotLoggerContent)
        self._sigslot.registerSlot(self.slotClearLock)

    def initChannels(self, topLevel="", schema=None):
        """
        Initialise Input-/OutputChannels
        :param schema to recurse for channels - if None, use self._fullSchema
        :param topLevel is path in schema hierarchy where to start recursion
        """
        with self._stateChangeLock:
            if schema is None:
                schema = self._fullSchema
            self._initChannels(topLevel, schema)

    def _initChannels(self, topLevel, schema):
        """
        Helper for initChannels, requiring _stateChangeLock protection
        """
        # Keys under topLevel, without leading "topLevel.":
        subKeys = schema.getKeys(topLevel)
        # Now go recursively down the node:
        for subKey in subKeys:
            key = topLevel + '.' + subKey if topLevel else subKey
            if schema.hasClassId(key):
                classId = schema.getClassId(key)
                if classId == "OutputChannel":
                    self._prepareOutputChannel(key)
                elif classId == "InputChannel":
                    self._prepareInputChannel(key)
                else:
                    self.log.DEBUG("Not creating in-/output channel for '"
                                   + key + "' since it's a '"
                                   + classId + "'")
            elif schema.isNode(key):
                # Recursively go down the tree for channels within nodes
                self.log.DEBUG("Looking for input/output channels " +
                               "under node '" + key + "'")
                self._initChannels(key, schema)

    def _prepareOutputChannel(self, path):
        """
        Internal method to create an OutputChannel for given path.
        Needs _stateChangeLock protection
        """
        self.log.INFO(f"Creating output channel '{path}'")
        outputChannel = self._sigslot.createOutputChannel(
            path, self._parameters)
        if not outputChannel:
            self.log.ERROR(f"Failed to create output channel "
                           f"'{path}'")
        else:
            def connectionsHandler(table):
                with self._stateChangeLock:
                    if self._fullSchema.has(path):
                        self._setNoStateLock(path + ".connections", table)
                    # else might just be removed by self.updateSchema

            outputChannel.registerShowConnectionsHandler(connectionsHandler)

            def statsHandler(bytesRead, bytesWritten):
                with self._stateChangeLock:
                    if self._fullSchema.has(path):
                        h = Hash(path + ".bytesRead", bytesRead,
                                 path + ".bytesWritten", bytesWritten)
                        self._setNoStateLock(h)
                    # else might just be removed by self.updateSchema

            outputChannel.registerShowStatisticsHandler(statsHandler)
            # Publish the resolved address of the output channel
            update = Hash(path, outputChannel.getInitialConfiguration())
            # No lock since this method requires _stateChangeLock protection
            self._setNoStateLock(update, self.getActualTimestamp())

    def _prepareInputChannel(self, path):
        """
        Internal method to create an InputChannel for given path.
        Needs _stateChangeLock protection
        """
        self.log.INFO(f"Creating input channel '{path}'")
        handlers = self._inputChannelHandlers.get(path, [None] * 3)

        def tracker(name, status):
            if (status == ConnectionStatus.CONNECTING
                    or status == ConnectionStatus.DISCONNECTING):
                return  # ignore any intermediate connection status
            updateType = "addIfNotIn"
            if status == ConnectionStatus.CONNECTED:
                updateType = "removeOne"
            self.setVectorUpdate(path + ".missingConnections",
                                 [name], updateType)

        self._sigslot.createInputChannel(path, self._parameters, handlers[0],
                                         handlers[1], handlers[2], tracker)
        h = Hash(path + ".missingConnections",
                 self._parameters.get(path + ".connectedOutputChannels"))
        self._setNoStateLock(h)

    def KARABO_ON_DATA(self, channelName, handlerPerData):
        """Registers a data handler function

        This function will be called if data is received on an input channel
        identified by `channelName`. The handler function should have the
        signature:

            def onData(data, metaData):
                pass

        where `data` and `metaData` are both Hashes.
        """
        self._inputChannelHandlers.setdefault(channelName, [None] * 3)
        self._inputChannelHandlers[channelName][0] = handlerPerData
        self._sigslot.registerDataHandler(channelName, handlerPerData)

    def KARABO_ON_INPUT(self, channelName, handlerPerInput):
        """Registers an input handler function

        Registers a handler to be called if data is available on the input
        channel identified by `channelName`. It is up to the device developer
        to read data (in contrast to the `KARABO_ON_DATA` registration).

            def onInput(input):
                for i in range(input.size()):
                    data, metaData = input.read(i)

        Here `input` is a reference to the input channel.
        """
        self._inputChannelHandlers.setdefault(channelName, [None] * 3)
        self._inputChannelHandlers[channelName][1] = handlerPerInput
        self._sigslot.registerInputHandler(channelName, handlerPerInput)

    def KARABO_ON_EOS(self, channelName, handler):
        """Registers an end-of-stream handler

        Registers a handler to be called if input channel identified by
        `channelName` is signaled end-of-stream.

        The handler function should  have the signature

             def onEos(input):
                 pass

        where `input` is a reference to the input channel.
        """
        self._inputChannelHandlers.setdefault(channelName, [None] * 3)
        self._inputChannelHandlers[channelName][2] = handler
        self._sigslot.registerEndOfStreamHandler(channelName, handler)

    def execute(self, command, *args):
        if len(args) == 0:
            self._sigslot.call("", command)
        elif len(args) == 1:
            self._sigslot.call("", command, args[0])
        elif len(args) == 2:
            self._sigslot.call("", command, args[0], args[1])
        elif len(args) == 3:
            self._sigslot.call("", command, args[0], args[1], args[2])
        elif len(args) == 4:
            self._sigslot.call("", command, args[0], args[1], args[2], args[3])
        else:
            raise AttributeError(
                "Number of command parameters should not exceed 4")

    def slotCallGuard(self, slotName, callee):
        # Check whether the slot is mentioned in the expectedParameters
        # as the call guard only works on those and will ignore all others
        with self._stateChangeLock:
            isSchemaSlot = self._fullSchema.has(slotName)

        # Check whether the slot can be called given the current locking state
        lockableSlot = isSchemaSlot or slotName == "slotReconfigure"
        if self.allowLock() and lockableSlot and slotName != "slotClearLock":
            self._ensureSlotIsValidUnderCurrentLock(slotName, callee)

        if isSchemaSlot:
            with self._stateChangeLock:
                if self._fullSchema.hasAllowedStates(slotName):
                    allowedStates = self._fullSchema.getAllowedStates(slotName)
                    if allowedStates:
                        currentState = State(self._parameters["state"])
                        if currentState not in allowedStates:
                            msg = "Command \"{}\" is not allowed in current " \
                                  "state \"{}\" of device \"{}\""\
                                .format(slotName, currentState.name,
                                        self.deviceid)
                            raise RuntimeError(msg)

        if lockableSlot:
            # Log the call of this slot by setting a parameter of the device
            self.set("lastCommand", slotName + " <- " + callee)

    def allowLock(self):
        """
        Overwrite this function for service devices that cannot be locked
        :return:
        """
        return True

    def slotClearLock(self):
        """ Clear the lock on this device
        """
        self.set("lockedBy", "")

    def slotGetTime(self, info):
        """
        Return the actual time information of this device

        :param info: An empty place holder hash

        This slot returns a Hash with:

        - key ``time`` and the attributes provide an actual
        timestamp with train Id information
        - key ``timeServerId`` to show the configured time server
        - key ``reference`` and the attributes provide the latest
        received timestamp information from the timeserver
        """
        result = Hash()

        result.set("time", True)
        stamp = self.getActualTimestamp()
        stamp.toHashAttributes(result.getAttributes("time"))

        # Provide a nice output for the time server Id
        timeServer = 'None' if not self.timeServerId else self.timeServerId
        result.set("timeServerId", timeServer)

        # And the last reference stamp received
        result.set("reference", True)

        with self._timeLock:
            epoch = Epochstamp(self._timeSec, self._timeFrac)
            train = Trainstamp(self._timeId)
            stamp = Timestamp(epoch, train)

        attrs = result.getAttributes("reference")
        stamp.toHashAttributes(attrs)

        self.reply(result)

    def _ensureSlotIsValidUnderCurrentLock(self, slotName, callee):
        lockHolder = self["lockedBy"]
        if lockHolder:
            msg = "{} is locked by {} and called by {}"
            self.log.DEBUG(msg.format(self.deviceid, lockHolder, callee))
            if callee != "unknown" and callee != lockHolder:
                msg = "Command {} is not allowed as device is locked by {}"
                raise RuntimeError(msg.format(slotName, lockHolder))

    def slotGetConfiguration(self):
        with self._stateChangeLock:
            self._sigslot.reply(self._parameters, self.deviceid)

    def slotGetConfigurationSlice(self, info):
        paths = info.get("paths")
        cfgSlice = self.getCurrentConfigurationSlice(paths)
        self._sigslot.reply(cfgSlice)

    def slotReconfigure(self, newConfiguration):
        if newConfiguration.empty():
            return
        result, error, validated = self._validate(newConfiguration)
        if result:
            self.preReconfigure(validated)
            self._applyReconfiguration(validated)
            self.postReconfigure()
        else:
            raise ValueError(error)

    def _validate(self, unvalidated):
        currentState = self["state"]
        whiteList = self._getStateDependentSchema(currentState)
        flag, error, validated = self.validatorExtern.validate(
            whiteList, unvalidated, self.getActualTimestamp())
        return (flag, error, validated)

    def _applyReconfiguration(self, reconfiguration):

        with self._stateChangeLock:
            self._parameters += reconfiguration

        self._sigslot.emit("signalChanged", reconfiguration, self.deviceid)

    def slotGetSchema(self, onlyCurrentState):
        # state lock!
        if onlyCurrentState:
            currentState = self["state"]
            schema = self._getStateDependentSchema(currentState)
            self._sigslot.reply(schema, self.deviceid)
        else:
            with self._stateChangeLock:
                self._sigslot.reply(self._fullSchema, self.deviceid)

    def slotKillDevice(self):
        senderid = self._sigslot.getSenderInfo(
            "slotKillDevice").getInstanceIdOfSender()
        if senderid == self.serverid and self.serverid != "__none__":
            self.log.INFO("Device is going down as instructed by server")
        else:
            self.log.INFO("Device is going down as instructed by \"{}\""
                          .format(senderid))
            self._sigslot.call(self.serverid, "slotDeviceGone", self.deviceid)
        try:
            self.preDestruction()
        except Exception as e:
            # 'repr(e)' to get both, exception type and text
            self.log.WARN(f"Clean-up failed in slotKillDevice: {repr(e)}")
        finally:
            # TODO:
            # Remove this hack if known how to get rid of the object cleanly
            # (slotInstanceGone will be called in _sigslot destructor again)
            self._sigslot.call("*", "slotInstanceGone", self.deviceid,
                               self._sigslot.getInstanceInfo())

            # This will trigger the central event-loop to finish
            os.kill(os.getpid(), signal.SIGTERM)

    def slotTimeTick(self, id, sec, frac, period):
        epochNow = Epochstamp()
        with self._timeLock:
            self._timeId = id
            self._timeSec = sec
            self._timeFrac = frac
            # Fallback to the local timing ...
            if sec == 0:
                self._timeSec = epochNow.getSeconds()
                self._timeFrac = epochNow.getFractionalSeconds()
            self._timePeriod = period
        self.onTimeUpdate(id, sec, frac, period)

    def slotLoggerPriority(self, newprio):
        oldprio = Logger.getPriority()
        self.set("Logger.priority", newprio)
        Logger.setPriority(newprio)
        self.log.INFO(
            f"Logger Priority changed : {oldprio} ==> {newprio}")

    def getActualTimestamp(self):
        """Returns the actual timestamp.

        The Trainstamp part of Timestamp is extrapolated from the last values
        received via slotTimeTick (or zero if no time ticks received, i.e.
        timeServerId is empty). To receive time ticks, the server of the device
        has to be connected to a time server.

        :return: the actual timestamp
        """
        return self.getTimestamp(Epochstamp())  # i.e. for now

    def getTimestamp(self, epoch):
        """Returns the Timestamp for given Epochstamp.

        The Trainstamp part of Timestamp is extrapolated forward or backward
        from the last values received via slotTimeTick (or zero if no time
        ticks received yet). To receive time ticks, the server of the device
        has to be connected to a time server.

        :param epoch: Epochstamp for that the time stamp is searched for
        :return: the matching Timestamp, consisting of epoch and the
                 corresponding Trainstamp
        """
        resultId = 0
        with self._timeLock:
            if self._timePeriod > 0:
                epochLastReceived = Epochstamp(self._timeSec, self._timeFrac)
                # duration is always positive, irrespective whether epoch or
                # epochLastReceived is earlier
                duration = epoch.elapsed(epochLastReceived)
                nPeriods = (duration.getTotalSeconds() * 1000000
                            + duration.getFractions(MICROSEC)
                            ) // self._timePeriod
                if epochLastReceived <= epoch:
                    resultId = self._timeId + nPeriods
                elif self._timeId >= nPeriods + 1:  # sanity check
                    resultId = self._timeId - nPeriods - 1
                elif self.log:  # if 'log' is not yet initialised
                    self.log.WARN("Bad input: (train)Id zero since epoch = {};"
                                  " from time server: epoch = {}, id = {},"
                                  " period = {} mus"
                                  .format(epoch.toIso8601(),
                                          epochLastReceived.toIso8601(),
                                          self._timeId, self._timePeriod))
        return Timestamp(epoch, Trainstamp(resultId))

    def _getStateDependentSchema(self, state):
        with self._stateChangeLock:
            if state not in self._stateDependentSchema:
                rules = AssemblyRules(AccessType(WRITE), state.value)
                schemaForState = self._fullSchema.subSchemaByRules(rules)
                self._stateDependentSchema[state] = schemaForState
            return self._stateDependentSchema[state]

    def getInstanceId(self):
        return self._sigslot.getInstanceId()

    def registerSlot(self, slotFunc):
        self._sigslot.registerSlot(slotFunc)

    def updateLatencies(self, performanceMeasures):
        if self.get("performanceStatistics.enable"):
            # Keys and values of 'performanceMeasures' are defined in
            # SignalSlotable::updatePerformanceStatistics (C++)
            # and expectedParameters has to foresee this content under node
            # "performanceStatistics".
            self.set(Hash("performanceStatistics", performanceMeasures))

    def onBrokerError(self, message):
        self.log.ERROR(f"Broker consumption problem: {message}")
        # Trigger alarm, but not always a new one (system is busy anyway).
        # By setting messagingProblems up to every second, we can investigate
        # roughly the time of problems via the data logger.
        if (not self["performanceStatistics.messagingProblems"]
                or time.time() - self.lastBrokerErrorStamp >= 1.):
            self["performanceStatistics.messagingProblems"] = True
            self.lastBrokerErrorStamp = time.time()

    def setAlarmCondition(self, condition, **deprecated):
        """Set the global alarm condition

        :param condition: condition to set
        :return: None
        """
        if not isinstance(condition, AlarmCondition):
            raise TypeError("First argument must be 'AlarmCondition',"
                            " not '{}'".format(str(type(condition))))

        timestamp = self.getActualTimestamp()
        with self._stateChangeLock:
            self._setNoStateLock(
                "alarmCondition", condition.asString(),
                timestamp, validate=False)

    def getAlarmCondition(self, key=None, separator="."):
        if key is None:
            return AlarmCondition.fromString(self.get("alarmCondition"))
        else:
            with self._stateChangeLock:
                condition = self._parameters.getAttribute(
                    key, "alarmCondition", separator)
                return AlarmCondition.fromString(condition)

    def hasRollingStatistics(self, key):
        with self._stateChangeLock:
            return self._fullSchema.hasRollingStatistics(key)

    def getRollingStatistics(self, key):
        with self._stateChangeLock:
            # TODO
            # I fear we have to copy here, since 'getRollingStatistics is
            # defined as 'bp::return_internal_reference<>()' in PyUtilSChema.cc
            return self.validatorIntern.getRollingStatistics(key)

    @staticmethod
    def loadConfiguration(cfgFile):
        cfg = loadFromFile(cfgFile)
        os.remove(cfgFile)
        return cfg

    # the following functions expose parts of SignalSlotable to the public
    # device interface.

    def registerSignal(self, signalName, *args):
        """Register a signal to be handles in the remote system

        :param signalName:name of the signal to be registered
        :param args: signature of the signal, e.g. `str, Hash, str`
        """
        self._sigslot.registerSignal(signalName, *args)

    def connect(self, signalInstance, signalName, slotName):
        """Connect a signal with one of our slots

        :param signalInstance: instance the signal is on, use "" for local
        :param signalName: name of the signal to connect
        :param slotName: name of the slot to be executed upon signal reception
        :return whether connection could be established
        """
        return self._sigslot.connect(signalInstance, signalName, slotName)

    def reply(self, *args):
        """Place the reply of a slot being called

        Reply content will not be sent immediately, but when the slot call
        ends. If called more than once, the last call defines the slot reply.

        :param args: list of arguments to reply, maximum length is 4
        """
        self._sigslot.reply(*args)

    def emit(self, signalName, *args):
        """Emit a signal to the remote system

        :param signalName: name of the signal.
        :param args: list of arguments signal is emitted with. Maximum 4
        """
        self._sigslot.emit(signalName, *args)

    def call(self, instanceId, slotName, *args):
        """Call a remote slot with arguments

        :param instanceId: instance of the remote device to call slot on
        :param slotName: name of the slot to call on instanceId
        :param args: list of arguments to call slot with, maximum length is 4
        """
        self._sigslot.call(instanceId, slotName, *args)

    def request(self, instanceId, slotName, *args):
        """Request a reply from a remote slot

        :param instanceId: instance of the remote device to request from
        :param slotName: name of the slot to request from on instanceId
        :param args: list of arguments to call slot with, maximum length is 4
        :return: a `SignalSlotable.Requestor` object handling the reply
        """
        return self._sigslot.request(instanceId, slotName, *args)

    def requestNoWait(self, instanceId, slotName, replyInstance,
                      replySlotName, *args):
        """Request a reply from a remote slot

        :param instanceId: instance of the remote device to request from
        :param slotName: name of the slot to request from on instanceId
        :param replyInstance: instance on which to handle reply, use "" for
                              local device.
        :param replySlotName: slot to call with reply on replyInstance
        :param args: list of arguments to call slot with, maximum length is 4
        :return: a `SignalSlotable.Requestor` object handling the reply
        """
        return self._sigslot.requestNoWait(instanceId, slotName, replyInstance,
                                           replySlotName, *args)

    # Added for backward compatibility when fullSchema => _fullSchema
    @property
    @karabo_deprecated
    def fullSchema(self):
        """
        DEPRECATED - use getFullSchema()
        """
        return self.getFullSchema()

    # Added for backward compatibility when parameters => _parameters
    @property
    @karabo_deprecated
    def parameters(self):
        """
        DEPRECATED
        * for full config use getCurrentConfiguration()
        * maybe what you want is just some key, then use get(some_key)
        """
        return self.getCurrentConfiguration()

fullSchema property

DEPRECATED - use getFullSchema()

parameters property

DEPRECATED * for full config use getCurrentConfiguration() * maybe what you want is just some key, then use get(some_key)

signalSlotable property

Get SignalSlotable object embedded in PythonDevice instance.

KARABO_ON_DATA(channelName, handlerPerData)

Registers a data handler function

This function will be called if data is received on an input channel identified by channelName. The handler function should have the signature:

def onData(data, metaData):
    pass

where data and metaData are both Hashes.

Source code in src/pythonKarabo/karabo/bound/device.py
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
def KARABO_ON_DATA(self, channelName, handlerPerData):
    """Registers a data handler function

    This function will be called if data is received on an input channel
    identified by `channelName`. The handler function should have the
    signature:

        def onData(data, metaData):
            pass

    where `data` and `metaData` are both Hashes.
    """
    self._inputChannelHandlers.setdefault(channelName, [None] * 3)
    self._inputChannelHandlers[channelName][0] = handlerPerData
    self._sigslot.registerDataHandler(channelName, handlerPerData)

KARABO_ON_EOS(channelName, handler)

Registers an end-of-stream handler

Registers a handler to be called if input channel identified by channelName is signaled end-of-stream.

The handler function should have the signature

 def onEos(input):
     pass

where input is a reference to the input channel.

Source code in src/pythonKarabo/karabo/bound/device.py
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
def KARABO_ON_EOS(self, channelName, handler):
    """Registers an end-of-stream handler

    Registers a handler to be called if input channel identified by
    `channelName` is signaled end-of-stream.

    The handler function should  have the signature

         def onEos(input):
             pass

    where `input` is a reference to the input channel.
    """
    self._inputChannelHandlers.setdefault(channelName, [None] * 3)
    self._inputChannelHandlers[channelName][2] = handler
    self._sigslot.registerEndOfStreamHandler(channelName, handler)

KARABO_ON_INPUT(channelName, handlerPerInput)

Registers an input handler function

Registers a handler to be called if data is available on the input channel identified by channelName. It is up to the device developer to read data (in contrast to the KARABO_ON_DATA registration).

def onInput(input):
    for i in range(input.size()):
        data, metaData = input.read(i)

Here input is a reference to the input channel.

Source code in src/pythonKarabo/karabo/bound/device.py
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
def KARABO_ON_INPUT(self, channelName, handlerPerInput):
    """Registers an input handler function

    Registers a handler to be called if data is available on the input
    channel identified by `channelName`. It is up to the device developer
    to read data (in contrast to the `KARABO_ON_DATA` registration).

        def onInput(input):
            for i in range(input.size()):
                data, metaData = input.read(i)

    Here `input` is a reference to the input channel.
    """
    self._inputChannelHandlers.setdefault(channelName, [None] * 3)
    self._inputChannelHandlers[channelName][1] = handlerPerInput
    self._sigslot.registerInputHandler(channelName, handlerPerInput)

KARABO_SLOT(slot, slotName=None, numArgs=None)

Register a slot in the distributed system.

:param slot is the callable to register :param slotname is used to call the slot, 'None' means slot.name, note that a '_' can also be called as '.' as needed for nested slots in the Schema like 'node.slotInNode' :param numArgs number of arguments that the slot has, 'None' means to (try to) deduce from 'slot'

Note that a slot is only connected with a SLOT_ELEMENT if the key of the SLOT_ELEMENT matches the slot name provided to this function.

SLOT_ELEMENT(expected).key("slotDoSomething")

....

self.KARABO_SLOT(slotDoSomething)

.....

def slotDoSomething(self):
    pass

For slots under a node, the method name needs to replace '.' by '_'.

SLOT_ELEMENT(expected).key("node.slotOther")

....

self.KARABO_SLOT(node_slotOther)

....

def node_slotOther(self):
    pass
Source code in src/pythonKarabo/karabo/bound/device.py
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
def KARABO_SLOT(self, slot, slotName=None, numArgs=None):
    """Register a slot in the distributed system.

    :param slot is the callable to register
    :param slotname is used to call the slot, 'None' means slot.__name__,
                    note that a '_' can also be called as '.' as needed
                    for nested slots in the Schema like 'node.slotInNode'
    :param numArgs number of arguments that the slot has,
                   'None' means to (try to) deduce from 'slot'

    Note that a slot is only connected with a SLOT_ELEMENT if the key of
    the SLOT_ELEMENT matches the slot name provided to this function.

        SLOT_ELEMENT(expected).key("slotDoSomething")

        ....

        self.KARABO_SLOT(slotDoSomething)

        .....

        def slotDoSomething(self):
            pass

    For slots under a node, the method name needs to replace '.' by '_'.

        SLOT_ELEMENT(expected).key("node.slotOther")

        ....

        self.KARABO_SLOT(node_slotOther)

        ....

        def node_slotOther(self):
            pass
    """

    if slotName is None:
        if numArgs is None:
            self._sigslot.registerSlot(slot)
        else:
            self._sigslot.registerSlot(slot, numArgs=numArgs)
    elif numArgs is None:
        self._sigslot.registerSlot(slot, slotName)
    else:
        self._sigslot.registerSlot(slot, slotName, numArgs)

__getitem__(key)

Alternative for value = self.get(key): value = self[key]

Source code in src/pythonKarabo/karabo/bound/device.py
768
769
770
def __getitem__(self, key):
    """Alternative for `value = self.get(key)`: `value = self[key]`"""
    return self.get(key)

__init__(configuration)

The initialization method of a device

Expects a configuration passed as a Karabo Hash.

:param configuration: the configuration Hash. It may contain the following entries:

   - _serverId_: there id of the hosting server
   - _deviceId_: a usually autogenerated device id
Source code in src/pythonKarabo/karabo/bound/device.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def __init__(self, configuration: Hash | None):
    """The initialization method of a device

    Expects a configuration passed as a Karabo Hash.

    :param configuration: the configuration Hash. It may contain the
           following entries:

           - _serverId_: there id of the hosting server
           - _deviceId_: a usually autogenerated device id
    """
    if configuration is None:
        raise ValueError("Configuration must be Hash object, not None")
    super().__init__()

    self._parameters = configuration
    if "_serverId_" in self._parameters:
        self.serverid = self._parameters["_serverId_"]
    else:
        self.serverid = "__none__"

    if "_deviceId_" in self._parameters:
        self.deviceid = self._parameters["_deviceId_"]
    else:
        self.deviceid = "__none__"  # TODO: generate uuid

    # Initialize threading locks...
    self._stateChangeLock = threading.Lock()
    self._stateDependentSchema = {}
    self._injectedSchema = Schema()

    # Initialize _client to None (important!)
    self._client = None

    # Initialize hostName
    if "hostName" not in self._parameters:
        self._parameters["hostName"] = (
            socket.gethostname().partition('.')[0])
    self.hostname = self._parameters["hostName"]

    # timeserver related
    self._timeLock = threading.Lock()
    self._timeId = 0
    self._timeSec = 0
    self._timeFrac = 0
    self._timePeriod = 0

    # Setup the validation classes
    rules = ValidatorValidationRules()
    rules.allowAdditionalKeys = False
    rules.allowMissingKeys = True
    rules.allowUnrootedConfiguration = True
    rules.injectDefaults = False
    rules.injectTimestamps = True
    rules.forceInjectedTimestamp = False  # allows to specify case-by-case
    # Internal validator for set(..)
    self.validatorIntern = Validator(rules)
    rules.forceInjectedTimestamp = True
    # External validator for slotReconfigure(..)
    self.validatorExtern = Validator(rules)

    # For broker error handler
    self.lastBrokerErrorStamp = 0

    self.initClassId()
    self.initSchema()

    with self._stateChangeLock:
        self._parameters.set("classId", self.classid)
        # class version is the (base) module name plus __version__ where
        # the latter comes from KARABO_CLASSINFO decorator and should be
        # the repository version
        clsVers = f"{self.__module__.split('.', 1)[0]}-{self.__version__}"
        self._parameters.set("classVersion", clsVers)
        self._parameters.set("karaboVersion", karaboVersion)
        self._parameters.set("deviceId", self.deviceid)
        self._parameters.set("serverId", self.serverid)
        self._parameters.set("pid", os.getpid())

        # Validate first time to assign timestamps
        # Note that invalid property keys are already caught via
        # Configurator(PythonDevice).create in launchPythonDevice below.
        result, error, validated = self.validatorIntern.validate(
            self._fullSchema, self._parameters,
            self.getActualTimestamp())
        if not result:
            raise RuntimeError(error)
        self._parameters.merge(validated,
                               HashMergePolicy.REPLACE_ATTRIBUTES)

    # Create 'info' hash
    info = Hash("type", "device")
    info["classId"] = self.classid
    info["serverId"] = self.serverid
    info["host"] = self.hostname
    currentState = self["state"]
    if currentState is State.ERROR:
        status = "error"
    elif currentState is State.UNKNOWN:
        status = "unknown"
    else:
        status = "ok"
    info["status"] = status

    # device capabilities are encoded in a bit mask field
    capabilities = 0
    if configuration.has("availableScenes"):
        capabilities |= Capabilities.PROVIDES_SCENES
    if configuration.has("availableMacros"):
        capabilities |= Capabilities.PROVIDES_MACROS
    if configuration.has("interfaces"):
        capabilities |= Capabilities.PROVIDES_INTERFACES

    info["capabilities"] = capabilities

    interfaces = 0
    if configuration.has("interfaces"):
        for description in self.get("interfaces"):
            if description in Interfaces.__members__:
                interfaces |= Interfaces[description]
            else:
                raise NotImplementedError(
                    "Provided interface is not supported: {}".format(
                        description))

        info["interfaces"] = interfaces

    # Setup device logger (needs self._parameters) before SignalSlotable
    # to log e.g. broker setup (i.e. logging must not log to broker).
    self.loadLogger()
    self.log = Logger.getCategory(self.deviceid)

    # Instantiate SignalSlotable object
    self._sigslot = SignalSlotable(
        self.deviceid, PythonDevice.connectionParams,
        self._parameters["heartbeatInterval"], info)

    # Initialize Device slots and instantiate all channels
    self._initDeviceSlots()
    self._inputChannelHandlers = {}  # for re-injected InputChannel

    self.initChannels()

    # Register guard for slot calls
    self._sigslot.registerSlotCallGuardHandler(self.slotCallGuard)

    # Register updateLatencies handler
    self._sigslot.registerPerformanceStatisticsHandler(
        self.updateLatencies)

    self._sigslot.registerBrokerErrorHandler(self.onBrokerError)

    # Initial functions one can register
    self._func = []

__setitem__(key, value)

Alternative to self.set: self[key] = value

The timestamp is set to the current timestamp

Source code in src/pythonKarabo/karabo/bound/device.py
683
684
685
686
687
688
def __setitem__(self, key, value):
    """Alternative to `self.set`: `self[key] = value`

    The timestamp is set to the current timestamp
    """
    self.set(key, value, self.getActualTimestamp())

aliasHasKey(alias)

Check if a key for a given alias exists

Source code in src/pythonKarabo/karabo/bound/device.py
1023
1024
1025
1026
def aliasHasKey(self, alias):
    """Check if a key for a given alias exists"""
    with self._stateChangeLock:
        return self._fullSchema.aliasHasKey(alias)

allowLock()

Overwrite this function for service devices that cannot be locked :return:

Source code in src/pythonKarabo/karabo/bound/device.py
1443
1444
1445
1446
1447
1448
def allowLock(self):
    """
    Overwrite this function for service devices that cannot be locked
    :return:
    """
    return True

appendSchema(schema)

Append to the existing device schema

If a property is being reinjected, and of the same type, then it will keep its current value. If it does not fit within range, an error will be raised. Likewise, if the type changes, and the value cannot be cast, an error will be raised.

Input and output channels will be created if injected. An output channel is also recreated if its schema changes, to make the other end aware. Note that for newly created input channels there are no data, input and end-of-stream handlers registered. This has to be done via the corresponding self.KARABO_ON_[DATA|INPUT|EOS] methods. If an InputChannel is re-injected, its handlers are kept.

:param schema: to append to current full schema

Source code in src/pythonKarabo/karabo/bound/device.py
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
def appendSchema(self, schema):
    """Append to the existing device schema

    If a property is being reinjected, and of the same type, then it will
    keep its current value. If it does not fit within range, an error will
    be raised.
    Likewise, if the type changes, and the value cannot be cast, an error
    will be raised.

    Input and output channels will be created if injected.
    An output channel is also recreated if its schema changes, to make the
    other end aware.
    Note that for newly created input channels there are no data, input
    and end-of-stream handlers registered. This has to be done
    via the corresponding self.KARABO_ON_[DATA|INPUT|EOS] methods.
    If an InputChannel is re-injected, its handlers are kept.

    :param schema: to append to current full schema
    """
    rules = ValidatorValidationRules()
    rules.allowAdditionalKeys = True
    rules.allowMissingKeys = True
    rules.allowUnrootedConfiguration = True
    rules.injectDefaults = True
    rules.injectTimestamps = True
    validator = Validator()
    validator.setValidationRules(rules)
    _, _, validated = validator.validate(schema, Hash(),
                                         self.getActualTimestamp())

    with self._stateChangeLock:
        # Take care of OutputChannels schema changes
        outChannelsToRecreate = set()
        for path in self._sigslot.getOutputChannelNames():
            if (self._fullSchema.has(path) and schema.has(path)
                and (not schema.hasClassId(path) or
                     schema.getClassId(path) != "OutputChannel")):
                # maybe output schema change without using OUTPUT_CHANNEL
                outChannelsToRecreate.add(path)
            # elif schema.getClassId(path) == "OutputChannel":
            #      will be recreated by _initChannels(schema) below

        self._stateDependentSchema = {}
        self._injectedSchema += schema

        prevFullSchemaLeaves = [p for p in self._fullSchema.getPaths()
                                if not self._fullSchema.isNode(p)]
        self._fullSchema.copy(self._staticSchema)
        self._fullSchema += self._injectedSchema

        # notify the distributed system...
        self._sigslot.emit("signalSchemaUpdated", self._fullSchema,
                           self.deviceid)

        # Keep new leaves only. This hash is then set, to avoid re-sending
        # updates with the same value.
        for path in prevFullSchemaLeaves:
            validated.erasePath(path)
        self._setNoStateLock(validated)

        # Init any freshly injected channels
        self._initChannels(topLevel="", schema=schema)
        # ... and those output channels with potential Schema change
        for outToCreate in outChannelsToRecreate:
            self.log.INFO("appendSchema triggers creation of output "
                          f"channel '{outToCreate}'")
            self._prepareOutputChannel(outToCreate)

    self.log.INFO("Schema appended")

appendSchemaMaxSize(path, value, emitFlag=True)

Append Schema to change/set maximum size information for path. If paths does not exist, raise KeyError.

This is similar to the more general appendSchema, but dedicated to a common use case.

:param path indicates the parameter which should be a Vector- or TableElement :param value is the new maximum size of the element :param emitFlag indicates if others should be informed about this Schema update. If this method is called for a bunch of paths, it is recommended to set this to True only for the last call.

Source code in src/pythonKarabo/karabo/bound/device.py
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
def appendSchemaMaxSize(self, path, value, emitFlag=True):
    """
    Append Schema to change/set maximum size information for path.
    If paths does not exist, raise KeyError.

    This is similar to the more general appendSchema, but dedicated to a
    common use case.

    :param path  indicates the parameter which should be a
                 Vector- or TableElement
    :param value is the new maximum size of the element
    :param emitFlag indicates if others should be informed about this
                    Schema update. If this method is called for a bunch of
                    paths, it is recommended to set this to True only for
                    the last call.
    """
    with self._stateChangeLock:
        if not self._fullSchema.has(path):
            raise KeyError("Path '{}' not found in the device schema."
                           .format(path))

        self._stateDependentSchema = {}
        # Do not touch static schema - that must be restorable via
        # updateSchema(Schema())
        # OVERWRITE_ELEMENT checks whether max size attribute makes sense
        # for path
        (OVERWRITE_ELEMENT(self._fullSchema).key(path)
         .setNewMaxSize(value).commit(),)
        if self._injectedSchema.has(path):
            (OVERWRITE_ELEMENT(self._injectedSchema).key(path)
             .setNewMaxSize(value).commit(),)

        if emitFlag:
            self._sigslot.emit("signalSchemaUpdated",
                               self._fullSchema, self.deviceid)

call(instanceId, slotName, *args)

Call a remote slot with arguments

:param instanceId: instance of the remote device to call slot on :param slotName: name of the slot to call on instanceId :param args: list of arguments to call slot with, maximum length is 4

Source code in src/pythonKarabo/karabo/bound/device.py
1749
1750
1751
1752
1753
1754
1755
1756
def call(self, instanceId, slotName, *args):
    """Call a remote slot with arguments

    :param instanceId: instance of the remote device to call slot on
    :param slotName: name of the slot to call on instanceId
    :param args: list of arguments to call slot with, maximum length is 4
    """
    self._sigslot.call(instanceId, slotName, *args)

connect(signalInstance, signalName, slotName)

Connect a signal with one of our slots

:param signalInstance: instance the signal is on, use "" for local :param signalName: name of the signal to connect :param slotName: name of the slot to be executed upon signal reception :return whether connection could be established

Source code in src/pythonKarabo/karabo/bound/device.py
1721
1722
1723
1724
1725
1726
1727
1728
1729
def connect(self, signalInstance, signalName, slotName):
    """Connect a signal with one of our slots

    :param signalInstance: instance the signal is on, use "" for local
    :param signalName: name of the signal to connect
    :param slotName: name of the slot to be executed upon signal reception
    :return whether connection could be established
    """
    return self._sigslot.connect(signalInstance, signalName, slotName)

emit(signalName, *args)

Emit a signal to the remote system

:param signalName: name of the signal. :param args: list of arguments signal is emitted with. Maximum 4

Source code in src/pythonKarabo/karabo/bound/device.py
1741
1742
1743
1744
1745
1746
1747
def emit(self, signalName, *args):
    """Emit a signal to the remote system

    :param signalName: name of the signal.
    :param args: list of arguments signal is emitted with. Maximum 4
    """
    self._sigslot.emit(signalName, *args)

filterByTags(configuration, tags)

Filter a given configuration Hash by tags

:param configuration: :param tags: a string, with several entries separated by commas spaces or semicolons :return: the filtered configuration Hash

Source code in src/pythonKarabo/karabo/bound/device.py
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
def filterByTags(self, configuration, tags):
    """Filter a given configuration Hash by tags

    :param configuration:
    :param tags: a string, with several entries separated by commas
                 spaces or semicolons
    :return: the filtered configuration Hash
    """
    with self._stateChangeLock:
        return HashFilter.byTag(self._fullSchema, configuration,
                                tags, " ,;")

get(key)

Return a property of this device

:param key: as defined in the expected parameter section :return: the value of the property

Source code in src/pythonKarabo/karabo/bound/device.py
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
def get(self, key):
    """Return a property of this device

    :param key: as defined in the expected parameter section
    :return: the value of the property
    """
    with self._stateChangeLock:
        try:
            result = self._parameters[key]
            if not self._fullSchema.hasClassId(key):
                classId = None
            else:
                classId = self._fullSchema.getClassId(key)
            if classId == KARABO_CLASS_ID_STATE:
                return State(result)
            elif classId == KARABO_CLASS_ID_ALARM:
                return AlarmCondition(result)
            elif isinstance(result, (Hash, VectorHash)):
                # For Hash and VectorHash, 'result' is a reference, so if
                # it would be returned and the returned object would be
                # changed, self._parameters would be changed as well, pro-
                # viding a back door without using self._stateChangeLock!
                return copy.copy(result)
            else:
                # Note that vectors of numbers are copies
                return result
        except RuntimeError as e:
            print(e)
            raise AttributeError(
                f"Error while retrieving '{key}' from device")

getActualTimestamp()

Returns the actual timestamp.

The Trainstamp part of Timestamp is extrapolated from the last values received via slotTimeTick (or zero if no time ticks received, i.e. timeServerId is empty). To receive time ticks, the server of the device has to be connected to a time server.

:return: the actual timestamp

Source code in src/pythonKarabo/karabo/bound/device.py
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
def getActualTimestamp(self):
    """Returns the actual timestamp.

    The Trainstamp part of Timestamp is extrapolated from the last values
    received via slotTimeTick (or zero if no time ticks received, i.e.
    timeServerId is empty). To receive time ticks, the server of the device
    has to be connected to a time server.

    :return: the actual timestamp
    """
    return self.getTimestamp(Epochstamp())  # i.e. for now

getAliasFromKey(key, aliasReferenceType)

Return the alias of a key :param key: to return the alias from :param aliasReferenceType: type the alias is of :return: an object of aliasReferenceType

Source code in src/pythonKarabo/karabo/bound/device.py
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
def getAliasFromKey(self, key, aliasReferenceType):
    """
    Return the alias of a key
    :param key: to return the alias from
    :param aliasReferenceType: type the alias is of
    :return: an object of aliasReferenceType
    """
    try:
        with self._stateChangeLock:
            return self._fullSchema.getAliasFromKey(key,
                                                    aliasReferenceType)
    except RuntimeError as e:
        raise AttributeError("Error while retrieving alias from parameter"
                             " ({}): {}".format(key, e))

getAvailableInstances()

Return available instances in the distributed system

Source code in src/pythonKarabo/karabo/bound/device.py
1094
1095
1096
1097
def getAvailableInstances(self):
    """Return available instances in the distributed system"""

    return self._sigslot.getAvailableInstances()

getCurrentConfiguration(tags='')

Return the current configuration, optionally filtered by tags

:param tags: a string, with several entries separated by commas spaces or semicolons. Set to an empty string if no filtering is to be applied. :return: a configuration Hash

Source code in src/pythonKarabo/karabo/bound/device.py
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
def getCurrentConfiguration(self, tags=""):
    """Return the current configuration, optionally filtered by tags

    :param tags: a string, with several entries separated by commas
                spaces or semicolons. Set to an empty string if no
                filtering is to be applied.
    :return: a configuration Hash
    """
    with self._stateChangeLock:
        if tags == "":
            # Outside the state change lock we need a copy:
            return copy.copy(self._parameters)
        else:
            return HashFilter.byTag(self._fullSchema, self._parameters,
                                    tags, " ,;")

getCurrentConfigurationSlice(paths)

Retrieves a slice of the current configuration.

:param paths: of the configuration which should be returned (as declared in expectedParameters, method throws if a non-existing path is given) :return: Hash with the current values and attributes (e.g. timestamp) of the selected configuration

Source code in src/pythonKarabo/karabo/bound/device.py
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
def getCurrentConfigurationSlice(self, paths):
    """Retrieves a slice of the current configuration.

    :param paths: of the configuration which should be returned
                  (as declared in expectedParameters,
                   method throws if a non-existing path is given)
    :return: Hash with the current values and attributes (e.g. timestamp)
             of the selected configuration
    """
    result = Hash()
    with self._stateChangeLock:
        for p in paths:
            node = self._parameters.getNode(p)
            # with normal set, type deduction for empty vector may fail
            result.setAs(p, node.getValue(), node.getType())
            newNode = result.getNode(p)
            newNode.setAttributes(node.getAttributes())

    return result

getFullSchema()

Return the full schema describing this device

:return: a karabo Schema object

Source code in src/pythonKarabo/karabo/bound/device.py
772
773
774
775
776
777
778
779
780
781
def getFullSchema(self):
    """Return the full schema describing this device

    :return: a karabo Schema object
    """
    # Have to copy to protect using it while updating
    s = Schema()
    with self._stateChangeLock:
        s.copy(self._fullSchema)
    return s

getKeyFromAlias(alias)

Return the key mapping to a given alias

Source code in src/pythonKarabo/karabo/bound/device.py
1014
1015
1016
1017
1018
1019
1020
1021
def getKeyFromAlias(self, alias):
    """Return the key mapping to a given alias"""
    try:
        with self._stateChangeLock:
            return self._fullSchema.getKeyFromAlias(alias)
    except RuntimeError as e:
        raise AttributeError("Error while retrieving parameter from alias"
                             " ({}): {}".format(alias, e))

getServerId()

Return the id of the server hosting this devices

Source code in src/pythonKarabo/karabo/bound/device.py
1089
1090
1091
1092
def getServerId(self):
    """Return the id of the server hosting this devices"""

    return self.serverid

getTimestamp(epoch)

Returns the Timestamp for given Epochstamp.

The Trainstamp part of Timestamp is extrapolated forward or backward from the last values received via slotTimeTick (or zero if no time ticks received yet). To receive time ticks, the server of the device has to be connected to a time server.

:param epoch: Epochstamp for that the time stamp is searched for :return: the matching Timestamp, consisting of epoch and the corresponding Trainstamp

Source code in src/pythonKarabo/karabo/bound/device.py
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
def getTimestamp(self, epoch):
    """Returns the Timestamp for given Epochstamp.

    The Trainstamp part of Timestamp is extrapolated forward or backward
    from the last values received via slotTimeTick (or zero if no time
    ticks received yet). To receive time ticks, the server of the device
    has to be connected to a time server.

    :param epoch: Epochstamp for that the time stamp is searched for
    :return: the matching Timestamp, consisting of epoch and the
             corresponding Trainstamp
    """
    resultId = 0
    with self._timeLock:
        if self._timePeriod > 0:
            epochLastReceived = Epochstamp(self._timeSec, self._timeFrac)
            # duration is always positive, irrespective whether epoch or
            # epochLastReceived is earlier
            duration = epoch.elapsed(epochLastReceived)
            nPeriods = (duration.getTotalSeconds() * 1000000
                        + duration.getFractions(MICROSEC)
                        ) // self._timePeriod
            if epochLastReceived <= epoch:
                resultId = self._timeId + nPeriods
            elif self._timeId >= nPeriods + 1:  # sanity check
                resultId = self._timeId - nPeriods - 1
            elif self.log:  # if 'log' is not yet initialised
                self.log.WARN("Bad input: (train)Id zero since epoch = {};"
                              " from time server: epoch = {}, id = {},"
                              " period = {} mus"
                              .format(epoch.toIso8601(),
                                      epochLastReceived.toIso8601(),
                                      self._timeId, self._timePeriod))
    return Timestamp(epoch, Trainstamp(resultId))

getValueType(key)

Get the ValueType of a given key

:returns: The type in terms of karabo::util::ReferenceTypes

Source code in src/pythonKarabo/karabo/bound/device.py
1033
1034
1035
1036
1037
1038
1039
def getValueType(self, key):
    """Get the ValueType of a given key

    :returns: The type in terms of `karabo::util::ReferenceTypes`
    """
    with self._stateChangeLock:
        return self._fullSchema.getValueType(key)

initChannels(topLevel='', schema=None)

Initialise Input-/OutputChannels :param schema to recurse for channels - if None, use self._fullSchema :param topLevel is path in schema hierarchy where to start recursion

Source code in src/pythonKarabo/karabo/bound/device.py
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
def initChannels(self, topLevel="", schema=None):
    """
    Initialise Input-/OutputChannels
    :param schema to recurse for channels - if None, use self._fullSchema
    :param topLevel is path in schema hierarchy where to start recursion
    """
    with self._stateChangeLock:
        if schema is None:
            schema = self._fullSchema
        self._initChannels(topLevel, schema)

keyHasAlias(key)

Check if a given key has an alias defined

Source code in src/pythonKarabo/karabo/bound/device.py
1028
1029
1030
1031
def keyHasAlias(self, key):
    """Check if a given key has an alias defined"""
    with self._stateChangeLock:
        return self._fullSchema.keyHasAlias(key)

loadLogger()

Load the distributed logger

Uses config in self._parameters["Logger"] and PythonDevice._loggerCfg

Source code in src/pythonKarabo/karabo/bound/device.py
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def loadLogger(self):
    """Load the distributed logger

    Uses config in self._parameters["Logger"] and PythonDevice._loggerCfg
    """
    # Take cfg as passed from server and merge device specific settings
    if PythonDevice._loggerCfg is None:  # for now if started from MDL
        config = self._parameters["Logger"]
    else:
        config = copy.copy(PythonDevice._loggerCfg)
        config.merge(self._parameters["Logger"])

    # Cure the file name of file logger: own dir inside server's log dir:
    if 'file.filename' in config:
        serverLogDir = os.path.dirname(config['file.filename'])
        path = os.path.join(serverLogDir, self.deviceid)
    else:  # Again, for now if started from MDL
        path = os.path.join(os.environ["KARABO"], "var", "log",
                            self.serverid, self.deviceid)
    if not os.path.isdir(path):
        os.makedirs(path)
    path = os.path.join(path, 'device.log')
    config.set('file.filename', path)

    # finally configure the logger
    Logger.configure(config)
    Logger.useOstream()
    Logger.useFile()
    Logger.useCache()

noStateTransition(currentState, currentEvent)

This function is called if a requested state transition is not allowed in the current context. Usually, this means you have an error in your state machine.

Source code in src/pythonKarabo/karabo/bound/device.py
1167
1168
1169
1170
1171
1172
1173
1174
1175
def noStateTransition(self, currentState, currentEvent):
    """
    This function is called if a requested state transition is not allowed
    in the current context. Usually, this means you have an error in your
    state machine.
    """
    self.log.WARN("Device \"{}\" being in state '{}' does not allow the"
                  " transition for event '{}'."
                  .format(self.deviceid, currentState, currentEvent))

onTimeUpdate(id, sec, frac, period)

Called when an update from the time server is received

:param id: train id :param sec: seconds :param frac: fractional seconds :param period: :return:

Source code in src/pythonKarabo/karabo/bound/device.py
1177
1178
1179
1180
1181
1182
1183
1184
1185
def onTimeUpdate(self, id, sec, frac, period):
    """Called when an update from the time server is received

    :param id: train id
    :param sec: seconds
    :param frac: fractional seconds
    :param period:
    :return:
    """

postReconfigure()

Use this hook to react on configuration changes after they have been validated and applied to the device, and have been notified to the distributed system.

Source code in src/pythonKarabo/karabo/bound/device.py
1106
1107
1108
1109
1110
1111
def postReconfigure(self):
    """
    Use this hook to react on configuration changes after they have been
    validated and applied to the device, and have been notified to the
    distributed system.
    """

preDestruction()

Use this hook if you need to perform clean-up actions before a device gets destroyed.

Source code in src/pythonKarabo/karabo/bound/device.py
1113
1114
1115
1116
1117
def preDestruction(self):
    """
    Use this hook if you need to perform clean-up actions before a device
    gets destroyed.
    """

preReconfigure(incomingReconfiguration)

Use this hook to alter a configuration Hash before it gets applied to the device and the distributed system is notified of the change. :param incomingReconfiguration:

Source code in src/pythonKarabo/karabo/bound/device.py
1099
1100
1101
1102
1103
1104
def preReconfigure(self, incomingReconfiguration):
    """
    Use this hook to alter a configuration Hash before it gets applied to
    the device and the distributed system is notified of the change.
    :param incomingReconfiguration:
    """

registerSignal(signalName, *args)

Register a signal to be handles in the remote system

:param signalName:name of the signal to be registered :param args: signature of the signal, e.g. str, Hash, str

Source code in src/pythonKarabo/karabo/bound/device.py
1713
1714
1715
1716
1717
1718
1719
def registerSignal(self, signalName, *args):
    """Register a signal to be handles in the remote system

    :param signalName:name of the signal to be registered
    :param args: signature of the signal, e.g. `str, Hash, str`
    """
    self._sigslot.registerSignal(signalName, *args)

remote()

Return a DeviceClient instance.

The DeviceClient will use this device's SignalSlotable to interact with the distributed system :return:

Source code in src/pythonKarabo/karabo/bound/device.py
520
521
522
523
524
525
526
527
528
529
530
def remote(self):
    """Return a DeviceClient instance.

    The DeviceClient will use this device's SignalSlotable to interact with
    the distributed system
    :return:
    """
    if self._client is None:
        # SignalSlotable object for reuse
        self._client = DeviceClient(self._sigslot)
    return self._client

reply(*args)

Place the reply of a slot being called

Reply content will not be sent immediately, but when the slot call ends. If called more than once, the last call defines the slot reply.

:param args: list of arguments to reply, maximum length is 4

Source code in src/pythonKarabo/karabo/bound/device.py
1731
1732
1733
1734
1735
1736
1737
1738
1739
def reply(self, *args):
    """Place the reply of a slot being called

    Reply content will not be sent immediately, but when the slot call
    ends. If called more than once, the last call defines the slot reply.

    :param args: list of arguments to reply, maximum length is 4
    """
    self._sigslot.reply(*args)

request(instanceId, slotName, *args)

Request a reply from a remote slot

:param instanceId: instance of the remote device to request from :param slotName: name of the slot to request from on instanceId :param args: list of arguments to call slot with, maximum length is 4 :return: a SignalSlotable.Requestor object handling the reply

Source code in src/pythonKarabo/karabo/bound/device.py
1758
1759
1760
1761
1762
1763
1764
1765
1766
def request(self, instanceId, slotName, *args):
    """Request a reply from a remote slot

    :param instanceId: instance of the remote device to request from
    :param slotName: name of the slot to request from on instanceId
    :param args: list of arguments to call slot with, maximum length is 4
    :return: a `SignalSlotable.Requestor` object handling the reply
    """
    return self._sigslot.request(instanceId, slotName, *args)

requestNoWait(instanceId, slotName, replyInstance, replySlotName, *args)

Request a reply from a remote slot

:param instanceId: instance of the remote device to request from :param slotName: name of the slot to request from on instanceId :param replyInstance: instance on which to handle reply, use "" for local device. :param replySlotName: slot to call with reply on replyInstance :param args: list of arguments to call slot with, maximum length is 4 :return: a SignalSlotable.Requestor object handling the reply

Source code in src/pythonKarabo/karabo/bound/device.py
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
def requestNoWait(self, instanceId, slotName, replyInstance,
                  replySlotName, *args):
    """Request a reply from a remote slot

    :param instanceId: instance of the remote device to request from
    :param slotName: name of the slot to request from on instanceId
    :param replyInstance: instance on which to handle reply, use "" for
                          local device.
    :param replySlotName: slot to call with reply on replyInstance
    :param args: list of arguments to call slot with, maximum length is 4
    :return: a `SignalSlotable.Requestor` object handling the reply
    """
    return self._sigslot.requestNoWait(instanceId, slotName, replyInstance,
                                       replySlotName, *args)

set(*args, **kwargs)

Updates device properties and notifies any observers. Note that an update of the device "state" property must be done using updateState(..).

args: can be of length

* one: expects a Hash, and uses current timestamp
* two: expects a key, value pair and uses current timestamp or a
       Hash, timestamp pair
* three: expects key, value and timestamp
validate: specifies if validation of args should be performed

before notification. Skipping validation should not be used with State or AlarmCondition.

If a Hash is provided, its keys should be device properties and the values should have the proper types. A State or AlarmCondition inside a Hash should be given as a string.

Source code in src/pythonKarabo/karabo/bound/device.py
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
def set(self, *args, **kwargs):
    """Updates device properties and notifies any observers.
    Note that an update of the device "state" property must be done using
    updateState(..).

    args: can be of length

        * one: expects a Hash, and uses current timestamp
        * two: expects a key, value pair and uses current timestamp or a
               Hash, timestamp pair
        * three: expects key, value and timestamp

    kwargs: validate: specifies if validation of args should be performed
            before notification. Skipping validation should not be used
            with State or AlarmCondition.

    If a Hash is provided, its keys should be device properties and the
    values should have the proper types. A State or AlarmCondition inside
    a Hash should be given as a string.
    """
    with self._stateChangeLock:
        self._setNoStateLock(*args, **kwargs)

setAlarmCondition(condition, **deprecated)

Set the global alarm condition

:param condition: condition to set :return: None

Source code in src/pythonKarabo/karabo/bound/device.py
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
def setAlarmCondition(self, condition, **deprecated):
    """Set the global alarm condition

    :param condition: condition to set
    :return: None
    """
    if not isinstance(condition, AlarmCondition):
        raise TypeError("First argument must be 'AlarmCondition',"
                        " not '{}'".format(str(type(condition))))

    timestamp = self.getActualTimestamp()
    with self._stateChangeLock:
        self._setNoStateLock(
            "alarmCondition", condition.asString(),
            timestamp, validate=False)

setVectorUpdate(key, updates, updateType, timestamp=None)

Concurrency safe update of vector property (not for tables)

:param key: key of the vector property to update :param updates: iterable of items to remove from property vector (starting at the front) or to add (at the end) :param updateType: indicates update type, applied individually to all items in 'updates', one of "add", "addIfNotIn", "removeOne", "removeAll" :param timestamp: optional timestamp to assign to updated vector property, defaults to self.getActualTimestamp()

Source code in src/pythonKarabo/karabo/bound/device.py
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
def setVectorUpdate(self, key, updates, updateType, timestamp=None):
    """Concurrency safe update of vector property (not for tables)

    :param key: key of the vector property to update
    :param updates: iterable of items to remove from property vector
                   (starting at the front) or to add (at the end)
    :param updateType: indicates update type, applied individually to all
                       items in 'updates',
                       one of "add", "addIfNotIn", "removeOne", "removeAll"
    :param timestamp: optional timestamp to assign to updated vector
                      property, defaults to self.getActualTimestamp()
    """
    if timestamp is None:
        timestamp = self.getActualTimestamp()

    with self._stateChangeLock:
        # vec is a copy, so we are safe if _setNoStateLock raises
        vec = self._parameters.get(key)
        if updateType == "add":
            vec.extend(updates)
        else:
            for update in updates:
                if updateType == "addIfNotIn":
                    if update not in vec:
                        vec.append(update)
                elif updateType == "removeOne":
                    if update in vec:
                        vec.remove(update)
                elif updateType == "removeAll":
                    for _ in range(vec.count(update)):
                        vec.remove(update)
                else:
                    raise ValueError(f"Unknown updateType '{updateType}'")
        # Finally update the property
        self._setNoStateLock(key, vec, timestamp)

signalEndOfStream(channelName)

Signal an end-of-stream event

The channel is identified by channnelName

Note: The methods 'writeChannel(..)' and 'signalEndOfStream(..)' must not be called concurrently.

Source code in src/pythonKarabo/karabo/bound/device.py
726
727
728
729
730
731
732
733
734
735
def signalEndOfStream(self, channelName):
    """Signal an end-of-stream event

    The channel is identified by `channnelName`

    Note:
    The methods 'writeChannel(..)' and 'signalEndOfStream(..)'
    must not be called concurrently.
    """
    self._sigslot.getOutputChannel(channelName).signalEndOfStream()

slotClearLock()

Clear the lock on this device

Source code in src/pythonKarabo/karabo/bound/device.py
1450
1451
1452
1453
def slotClearLock(self):
    """ Clear the lock on this device
    """
    self.set("lockedBy", "")

slotGetTime(info)

Return the actual time information of this device

:param info: An empty place holder hash

This slot returns a Hash with:

  • key time and the attributes provide an actual timestamp with train Id information
  • key timeServerId to show the configured time server
  • key reference and the attributes provide the latest received timestamp information from the timeserver
Source code in src/pythonKarabo/karabo/bound/device.py
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
def slotGetTime(self, info):
    """
    Return the actual time information of this device

    :param info: An empty place holder hash

    This slot returns a Hash with:

    - key ``time`` and the attributes provide an actual
    timestamp with train Id information
    - key ``timeServerId`` to show the configured time server
    - key ``reference`` and the attributes provide the latest
    received timestamp information from the timeserver
    """
    result = Hash()

    result.set("time", True)
    stamp = self.getActualTimestamp()
    stamp.toHashAttributes(result.getAttributes("time"))

    # Provide a nice output for the time server Id
    timeServer = 'None' if not self.timeServerId else self.timeServerId
    result.set("timeServerId", timeServer)

    # And the last reference stamp received
    result.set("reference", True)

    with self._timeLock:
        epoch = Epochstamp(self._timeSec, self._timeFrac)
        train = Trainstamp(self._timeId)
        stamp = Timestamp(epoch, train)

    attrs = result.getAttributes("reference")
    stamp.toHashAttributes(attrs)

    self.reply(result)

slotLoggerContent(info)

Slot call to receive logger content from the print logger

This slot is similar to slotLoggerContent for servers except that the serverId key is substituted with key deviceId.

look in the device_server module for detailed informations

Source code in src/pythonKarabo/karabo/bound/device.py
670
671
672
673
674
675
676
677
678
679
680
681
def slotLoggerContent(self, info):
    """Slot call to receive logger content from the print logger

    This slot is similar to `slotLoggerContent` for servers except that
    the `serverId` key is substituted with key `deviceId`.

    look in the device_server module for detailed informations
    """
    nMessages = info.get("logs", default=KARABO_LOGGER_CONTENT_DEFAULT)
    content = Logger.getCachedContent(nMessages)
    self._sigslot.reply(
        Hash("deviceId", self.deviceid, "content", content))

startInitialFunctions()

Start initial functions: second constructors(?)

Source code in src/pythonKarabo/karabo/bound/device.py
430
431
432
433
434
435
def startInitialFunctions(self):
    """Start initial functions: second constructors(?)"""
    # call initial function registered in the device constructor
    # in registration's order
    for f in self._func:
        f()

updateSchema(schema)

Updates the existing device schema It merges the schema in argument to the static schema defined in expectedParameters, removing any previous schema injections.

If a property is being reinjected, and of the same type, then it will keep its current value. If it does not fit within range, an error will be raised. Likewise, if the type changes, and the value cannot be cast, an error will be raised.

Input and output channels will be created if injected and removed again in case updateSchema is called again without them. An output channel is also recreated if its schema changes. Note that for newly created input channels there are no data, input and end-of-stream handlers registered. This has to be done via the corresponding self.KARABO_ON_[DATA|INPUT|EOS] methods. If an InputChannel is re-injected, its handlers are kept.

:param schema: to be merged with the static schema

Source code in src/pythonKarabo/karabo/bound/device.py
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
def updateSchema(self, schema):
    """Updates the existing device schema
    It merges the schema in argument to the static schema defined in
    expectedParameters, removing any previous schema injections.

    If a property is being reinjected, and of the same type, then it will
    keep its current value. If it does not fit within range, an error will
    be raised.
    Likewise, if the type changes, and the value cannot be cast, an error
    will be raised.

    Input and output channels will be created if injected and removed again
    in case updateSchema is called again without them.
    An output channel is also recreated if its schema changes.
    Note that for newly created input channels there are no data, input
    and end-of-stream handlers registered. This has to be done
    via the corresponding self.KARABO_ON_[DATA|INPUT|EOS] methods.
    If an InputChannel is re-injected, its handlers are kept.

    :param schema: to be merged with the static schema
    """
    rules = ValidatorValidationRules()
    rules.allowAdditionalKeys = True
    rules.allowMissingKeys = True
    rules.allowUnrootedConfiguration = True
    rules.injectDefaults = True
    rules.injectTimestamps = True
    validator = Validator()
    validator.setValidationRules(rules)
    _, _, validated = validator.validate(schema, Hash(),
                                         self.getActualTimestamp())

    with self._stateChangeLock:
        for path in self._injectedSchema.getPaths():
            if not (self._staticSchema.has(path) or schema.has(path)):
                self._parameters.erasePath(path)
                # Now we might have removed 'n.m.l.c' completely although
                # 'n.m' is in static schema - restore (empty) node 'n.m':
                pathSplit = path.split('.')
                for i in range(1, len(pathSplit)):
                    p = ".".join(pathSplit[0:-i])  # 'n.m.l', 'n.m', 'n'
                    if (self._staticSchema.has(p)
                            and not self._parameters.has(p)):
                        self._parameters[p] = Hash()
                        # 'n.m' added added back (after 'n.m.l' failed)
                        break

        self._stateDependentSchema.clear()

        prevFullSchemaLeaves = [p for p in self._fullSchema.getPaths()
                                if not self._fullSchema.isNode(p)]

        # Erase previously present injected InputChannels
        for inChannel in self._sigslot.getInputChannelNames():
            if self._staticSchema.has(inChannel):
                # Do not touch static one
                # (even if re-injected to change properties).
                continue
            if self._injectedSchema.has(inChannel):
                self.log.INFO("updateSchema: Remove input channel '"
                              f"{inChannel}'")
                self._sigslot.removeInputChannel(inChannel)
                if not schema.has(inChannel):
                    # not re-injected - clear handler back-up
                    del self._inputChannelHandlers[inChannel]
        # Treat injected OutputChannels
        outChannelsToRecreate = set()
        for outChannel in self._sigslot.getOutputChannelNames():
            if self._injectedSchema.has(outChannel):
                if self._staticSchema.has(outChannel):
                    # Channel changes its schema back to its default
                    outChannelsToRecreate.add(outChannel)
                else:
                    # Previously injected channel has to be removed
                    self.log.INFO("updateSchema: Remove output channel '"
                                  f"{outChannel}'")
                    self._sigslot.removeOutputChannel(outChannel)
            if (self._staticSchema.has(outChannel)
                and schema.has(outChannel)
                and (not schema.hasClassId(outChannel)
                     or schema.getClassId(outChannel)
                     != "OutputChannel"
                     )):
                outChannelsToRecreate.add(outChannel)

        self._injectedSchema.copy(schema)
        self._fullSchema.copy(self._staticSchema)
        self._fullSchema += self._injectedSchema

        # notify the distributed system...
        self._sigslot.emit("signalSchemaUpdated",
                           self._fullSchema, self.deviceid)

        # Keep new leaves only. This hash is then set, to avoid re-sending
        # updates with the same value.
        for path in prevFullSchemaLeaves:
            validated.erasePath(path)

        self._setNoStateLock(validated)

        # Init any freshly injected channels
        self._initChannels(topLevel="", schema=self._injectedSchema)
        # ... and those with potential Schema change
        for outToCreate in outChannelsToRecreate:
            self.log.INFO("updateSchema triggers creation of output "
                          f"channel '{outToCreate}'")
            self._prepareOutputChannel(outToCreate)

    self.log.INFO("Schema updated")

updateState(newState, propertyUpdates=None, timestamp=None)

Update the state property of the device to a new state.

:param newState: the state to set the device to :propertyUpdates: a Hash with further properties to update (or None) :timestamp: timestamp to be assigned to the update, if None, use self.getActualTimestamp() :return:

Source code in src/pythonKarabo/karabo/bound/device.py
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
def updateState(self, newState, propertyUpdates=None, timestamp=None):
    """Update the state property of the device to a new state.

    :param newState: the state to set the device to
    :propertyUpdates: a Hash with further properties to update (or None)
    :timestamp: timestamp to be assigned to the update,
                if None, use self.getActualTimestamp()
    :return:
    """
    assert isinstance(newState, State)
    stateName = newState.name
    self.log.DEBUG(f"updateState: {stateName}")
    if propertyUpdates is None:
        propertyUpdates = Hash()
    if timestamp is None:
        timestamp = self.getActualTimestamp()

    newStatus = None
    with self._stateChangeLock:
        if self._parameters["state"] != stateName:
            propertyUpdates.set("state", stateName)
            # Validator adds "indicateState" attribute
            if newState is State.ERROR:
                newStatus = "error"
            elif newState is State.UNKNOWN:
                newStatus = "unknown"
            else:
                statuses = ("error", "unknown")
                if self._sigslot.getInstanceInfo()["status"] in statuses:
                    newStatus = "ok"

        if propertyUpdates:
            self._setNoStateLock(propertyUpdates, timestamp)

    # Send potential instanceInfo update without state change lock
    if newStatus:
        self._sigslot.updateInstanceInfo(Hash("status", newStatus))
    # place new state as default reply to interested event initiators
    self._sigslot.reply(stateName)

writeChannel(channelName, data, timestamp=None, safeNDArray=False)

Write data to an output channel.

:param channelName: name given to an OUTPUT_CHANNEL in expectedParameters :param data: a Hash with keys as described in the Schema of the channel :param timestamp: optional timestamp; if none is given, the current timestamp is used :param safeNDArray: Boolean that should be set to 'True' if 'data' contains any 'NDArray' and their data is not changed after this 'writeChannel'. Otherwise, data will be copied if needed, i.e. when the output channel has to queue or serves inner-process receivers.

Example for an output channel sending an image (key: "image") and a frame number (key: "frame"):

imgArray = numpy.array(...) self.writeChannel("output", Hash("image", ImageData(imgArray), "frame", frameNumber)) Note: The methods 'writeChannel(..)' and 'signalEndOfStream(..)' must not be called concurrently.

Source code in src/pythonKarabo/karabo/bound/device.py
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
def writeChannel(self, channelName, data,
                 timestamp=None, safeNDArray=False):
    """Write data to an output channel.

    :param channelName: name given to an OUTPUT_CHANNEL in
                        expectedParameters
    :param data: a Hash with keys as described in the Schema of the
                 channel
    :param timestamp: optional timestamp; if none is given, the current
                      timestamp is used
    :param safeNDArray: Boolean that should be set to 'True' if 'data'
                        contains any 'NDArray' and their data is not
                        changed after this 'writeChannel'. Otherwise,
                        data will be copied if needed, i.e. when the output
                        channel has to queue or serves inner-process
                        receivers.

    Example for an output channel sending an image (key: "image") and
    a frame number (key: "frame"):

    imgArray = numpy.array(...)
    self.writeChannel("output", Hash("image", ImageData(imgArray),
                                     "frame", frameNumber))
    Note:
    The methods 'writeChannel(..)' and 'signalEndOfStream(..)'
    must not be called concurrently.
    """

    channel = self._sigslot.getOutputChannel(channelName)
    sourceName = f"{self.getInstanceId()}:{channelName}"
    if not timestamp:
        timestamp = self.getActualTimestamp()
    meta = ChannelMetaData(sourceName, timestamp)
    channel.write(data, meta)
    channel.update(safeNDArray=safeNDArray)

Device server serves as a launcher of python devices.

It scans 'plugins' directory for new plugins (python scripts) available and communicates its findings to master device server. It communicates XSD form of schema of user devices and starts such devices as separate process if user push "Initiate" button in GUI

Source code in src/pythonKarabo/karabo/bound/device_server.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
@KARABO_CONFIGURATION_BASE_CLASS
@KARABO_CLASSINFO("DeviceServer", "1.0")
class DeviceServer:
    """Device server serves as a launcher of python devices.

    It scans 'plugins' directory for new plugins (python scripts) available and
    communicates its findings to master device server.  It communicates XSD
    form of schema of user devices and starts such devices as separate process
    if user push "Initiate" button in GUI
    """
    instanceCountLock = threading.Lock()
    instanceCountPerDeviceServer = dict()

    @staticmethod
    def expectedParameters(expected):
        (
            STRING_ELEMENT(expected).key("serverId")
            .displayedName("Server ID")
            .description("The device-server instance id uniquely identifies a"
                         " device-server instance in the distributed system")
            .assignmentOptional().noDefaultValue()
            .commit(),

            STRING_ELEMENT(expected).key("hostName")
            .displayedName("Forced Hostname")
            .description(
                "The hostname can be optionally forced to a specific string. "
                "The host's definition will be used if not specified.")
            .assignmentOptional().noDefaultValue()
            .expertAccess()
            .init()
            .commit(),

            CHOICE_ELEMENT(expected).key("connection")
            .displayedName("Connection")
            .description("The connection to the communication layer of the"
                         " distributed system")
            .appendNodesOfConfigurationBase(Broker)
            .assignmentOptional().defaultValue(Broker.brokerTypeFromEnv())
            .expertAccess()
            .commit(),

            INT32_ELEMENT(expected).key("heartbeatInterval")
            .displayedName("Heartbeat interval")
            .description("The heartbeat interval")
            .assignmentOptional().defaultValue(10)
            .minInc(10)  # avoid too much traffic
            .expertAccess()
            .commit(),

            VECTOR_STRING_ELEMENT(expected).key("deviceClasses")
            .displayedName("Device Classes")
            .description("The device classes the server will manage")
            .assignmentOptional()
            .defaultValue([])
            .expertAccess()
            .commit(),

            STRING_ELEMENT(expected).key("init")
            .displayedName("Auto start")
            .description("Auto starts selected devices")
            .assignmentOptional().defaultValue("")
            .commit(),

            STRING_ELEMENT(expected).key("pluginNamespace")
            .displayedName("Plugin Namespace")
            .description("Namespace to search for plugins")
            .assignmentOptional().defaultValue(DEFAULT_NAMESPACE)
            .expertAccess()
            .commit(),

            VECTOR_STRING_ELEMENT(expected).key("serverFlags")
            .displayedName("Server Flags")
            .description("ServerFlags describing the device server, "
                         "the values must correspond to the enum ServerFlags")
            .assignmentOptional()
            .defaultValue([])
            .expertAccess()
            .commit(),

            NODE_ELEMENT(expected).key("Logger")
            .description("Logging settings")
            .displayedName("Logger")
            .appendParametersOf(Logger)
            .commit(),

            OVERWRITE_ELEMENT(expected).key("Logger.file.filename")
            # Will be assembled programmatically from environment and serverId
            .setNewAssignmentInternal()
            .commit(),

            STRING_ELEMENT(expected).key("timeServerId")
            .description("The instance id uniquely identifies a TimeServer"
                         " instance in the distributed system")
            .displayedName("TimeServer ID")
            .assignmentOptional().defaultValue("")
            .commit(),

            INT32_ELEMENT(expected).key("instantiationTimeout")
            .displayedName("Instantiation Timeout")
            .description("How long to wait for device coming up before "
                         "slotStartDevice fails")
            .unit(Unit.SECOND)
            .assignmentOptional().defaultValue(10)
            .commit(),
        )

    def signal_handler(self, signum, frame):
        if signum == signal.SIGINT:
            print('INTERRUPT : You pressed Ctrl-C!')
        else:
            print('INTERRUPT : You terminated me!')
        if self.ss is not None:
            # Better do not go via self.ss.call("", "slotKillServer"),
            # otherwise it will run later in another thread.
            self.slotKillServer()
        else:
            self.stopDeviceServer()

    def __init__(self, config):
        """Constructor"""
        if config is None:
            raise ValueError(
                "Input configuration for constructor should be Hash, not None")
        super().__init__()
        self.ss = self.log = None
        self.availableDevices = dict()
        self.deviceInstanceMap = dict()
        self._startingDevices = dict()  # needs protection by lock below
        self.deviceInstanceMapLock = threading.RLock()
        self.instantiationTimeout = config["instantiationTimeout"]
        if config.get('hostName') is not None:
            self.hostname = config['hostName']
        else:
            self.hostname = socket.gethostname().partition('.')[0]
        # Check if obsolete syntax still used...
        if config.has('autoStart'):
            raise KeyError(
                "'autoStart' syntax not supported anymore, use 'init'")
        self.autoStart = None
        if config.get("init") != "":
            asv = generateAutoStartHash(jsonToHash(config.get("init")))
            self.autoStart = asv['autoStart']

        self.deviceClasses = config.get("deviceClasses")
        self.pluginNamespace = config.get("pluginNamespace")
        self.timeServerId = config.get("timeServerId")

        if 'serverId' in config:
            self.serverid = config['serverId']
        else:
            self.serverid = self._generateDefaultServerId()

        self.connectionParameters = copy.copy(config['connection'])
        self.loggerParameters = None  # assemble in loadLogger
        self.pid = os.getpid()
        self.seqnum = 0

        # Start the logging system before the scanning of plugins, so any
        # error during plugin loading can be logged.
        self.loadLogger(config)
        self.log = Logger.getCategory(self.serverid)

        info = Hash("type", "server")
        info["serverId"] = self.serverid
        info["version"] = self.__class__.__version__
        info["host"] = self.hostname
        info["lang"] = "bound"
        info["log"] = config.get("Logger.priority")

        self.serverFlags = config.get("serverFlags")
        serverFlags = 0
        for flag in self.serverFlags:
            if flag in ServerFlags.__members__:
                serverFlags |= ServerFlags[flag]
            else:
                raise NotImplementedError(
                    f"Provided serverFlag is not supported: {flag}")
        info["serverFlags"] = serverFlags

        devicesInfo, scanLogs = self.scanPlugins(self.pluginNamespace)
        info.merge(devicesInfo)

        signal.signal(signal.SIGINT, self.signal_handler)
        signal.signal(signal.SIGTERM, self.signal_handler)

        # Be aware: If signal handling is triggered before __init__ has
        # finished, self.ss might be set to None, causing problems below,
        # e.g. "'NoneType' object has no attribute 'getConnection'".
        # In worst case, SignalSlotable is constructed, but not yet assigned
        # to self.ss when the signal handler wants to reset it to None.
        # But the process stops nevertheless due to the EventLoop.stop().

        self.ss = SignalSlotable(self.serverid, self.connectionParameters,
                                 config["heartbeatInterval"], info)

        # Register before self.ss.start(), i.e before sending instanceNew:
        self._registerAndConnectSignalsAndSlots()

        # Start SignalSlotable object - multithreading begins
        self.ss.start()

        # Now we can log the postponed logging messages - could have been
        # done directly after assigning self.log, but prefer to do after
        # instanceNew triggered by self.ss.start():
        for level, message in scanLogs:
            getattr(self.log, level)(message)

        msg = "DeviceServer starts on host '{0.hostname}' " \
              "with pid {0.pid}, broker: {1}"
        self.log.INFO(msg.format(self,
                                 self.ss.getConnection().getBrokerUrl()))

        self.doAutoStart()

    def _generateDefaultServerId(self):
        return self.hostname + "_Server_" + str(os.getpid())

    def loadLogger(self, inputCfg):
        self.loggerParameters = copy.copy(inputCfg["Logger"])
        # The file logger filename is completely specified here.
        path = os.path.join(os.environ['KARABO'], "var", "log", self.serverid)
        if not os.path.isdir(path):
            os.makedirs(path)
        path = os.path.join(path, 'device-server.log')
        self.loggerParameters.set('file.filename', path)

        Logger.configure(self.loggerParameters)
        Logger.useOstream()
        Logger.useFile()
        Logger.useCache()

    def _registerAndConnectSignalsAndSlots(self):
        self.ss.registerSlot(self.slotStartDevice)
        self.ss.registerSlot(self.slotKillServer)
        self.ss.registerSlot(self.slotDeviceUp)
        self.ss.registerSlot(self.slotDeviceGone)
        self.ss.registerSlot(self.slotGetClassSchema)
        self.ss.registerSlot(self.slotLoggerPriority)
        self.ss.registerSlot(self.slotLoggerContent)

    def scanPlugins(self, pluginNamespace):
        """Scan for available device classes

        Returns Hash with keys "deviceClasses" and "visibilities" to be merged
        into instance info and a list of log messages to be send.
        Inside the list there are tuples of two string: log level (e.g. "INFO")
        and message.
        Also fills self.availableDevices dictionary.
        """
        loaderCfg = Hash("pluginNamespace", pluginNamespace)
        loader = PluginLoader.create("PythonPluginLoader", loaderCfg)
        entrypoints = loader.update()
        logs = [("ERROR",
                 f"scanPlugins: Cannot load device plugin {name} -- {exc}")
                for name, exc in loader.plugin_errors.items()]
        for ep in entrypoints:
            if not self.deviceClasses or ep.name in self.deviceClasses:
                # All entry points have been verified already
                deviceClass = ep.load()
                classid = deviceClass.__classid__
                try:
                    schema = Configurator(PythonDevice).getSchema(classid)
                    self.availableDevices[classid] = {"module": ep.name,
                                                      "schema": schema}
                    logs.append(("INFO",
                                 'Successfully loaded plugin: "{}"'.format(
                                     ep.name)))
                # A generic handler is used here due to the different kind
                # of exceptions that may be raised when obtained the schema
                # for the just loaded plugin.
                except Exception as e:
                    m = "Failure while building schema for class {}, base " \
                        "class {} and bases {} : {}"\
                        .format(classid, deviceClass.__base_classid__,
                                deviceClass.__bases_classid__, repr(e))
                    # repr(e) also includes type
                    logs.append(("ERROR", m))

        instInfo = Hash("deviceClasses",
                        [classid for classid in self.availableDevices.keys()])

        return instInfo, logs

    def doAutoStart(self):
        if self.autoStart is None:
            return
        for entry in self.autoStart:
            # entry is a Hash with a single key which is the classid
            classId = entry.getKeys()[0]
            if classId in self.availableDevices:
                config = entry.get(classId)
                hsh = Hash("classId", classId,
                           "configuration", config)
                if 'deviceId' in config:
                    hsh.set('deviceId', config.get('deviceId'))
                self.instantiateDevice(hsh)

    def stopDeviceServer(self):
        # HACK: Sometimes the C++ destructor of SignalSlotable is not called
        #       and then instanceGone is not sent. This can happen when many
        #       variables are still referring to the SignalSlotable here.
        #       This has been seen when stopped early in the initialisation
        #       procedure, leading to a very high (>27000) refcount.
        #       If we are the last ones, it is guaranteed that the destructor
        #       is called.
        #
        # Take over SignalSlotable to avoid new references in other threads:
        localSigSlot = self.ss
        self.ss = None
        refCount = sys.getrefcount(localSigSlot)
        if refCount > 2:
            # count of 2: localSigSlot and the one internal to getrefcount
            print("Forced call to slotInstanceGone due to refcount", refCount)
            localSigSlot.call("*", "slotInstanceGone",
                              self.serverid, localSigSlot.getInstanceInfo())
        # HACK end

        EventLoop.stop()

    def slotStartDevice(self, configuration):
        self.instantiateDevice(configuration)

    def instantiateDevice(self, input_config):
        classid = input_config['classId']

        # Get configuration
        config = copy.copy(input_config['configuration'])

        # Inject serverId
        config['_serverId_'] = self.serverid

        # Inject deviceId
        if ('deviceId' not in input_config
                or len(input_config['deviceId']) == 0):
            config['_deviceId_'] = self._generateDefaultDeviceInstanceId(
                classid)
        else:
            config['_deviceId_'] = input_config['deviceId']

        self.log.INFO("Trying to start a '{}' with device id '{}'"
                      "...".format(classid, config['_deviceId_']))
        self.log.DEBUG(
            f"with the following configuration:\n{input_config}")

        # Inject HostName
        config['hostName'] = self.hostname

        # If not explicitely specified, let device inherit logger priority
        if not config.has("Logger.priority"):
            config["Logger.priority"] = self.loggerParameters["priority"]

        # Before starting device process, validate config
        schema = Configurator(PythonDevice).getSchema(classid)
        validator = Validator()
        (ok, msg, _) = validator.validate(schema, config)
        if not ok:
            msg = msg.strip()  # cut-off trailing newline...
            self.log.WARN(f"Failed to start '{config['_deviceId_']}': {msg}")
            self.ss.reply(ok, msg)
            return

        # Now few config manipulation intentionally AFTER 'validate':
        # Add connection type and parameters used by device server for
        # connecting to broker.
        config['_connection_'] = self.connectionParameters

        # Add temporary namespace variable
        config['_pluginNamespace_'] = self.pluginNamespace

        # Add time server ID configured for device server.
        config['timeServerId'] = self.timeServerId

        # Also add config for Logger appenders
        for appender in ["ostream", "file", "cache"]:
            config["_logger_." + appender] = self.loggerParameters[appender]

        reply = self.ss.createAsyncReply()

        try:
            self._launchDevice(config, classid, reply)
        except Exception as e:
            msg = f"Device of class '{classid}' could not be started: {e}"
            details = traceback.format_exc()
            self.log.WARN(f"{msg}:\nFailure details:\n{details}")
            # Cannot call AsyncReply object directly in slot, so post:
            EventLoop.post(lambda: reply.error(msg, details))

    def _launchDevice(self, config, classid, reply):

        deviceid = config["_deviceId_"]  # exists, see instantiateDevice

        modname = self.availableDevices[classid]["module"]

        # Create unique filename in /tmp - without '/' from deviceid...
        # .bin indicates binary format:
        # XML has trouble with VECTOR_STRING where string contains comma
        while True:
            filename = (f"/tmp/{modname}.{classid}."
                        f"{deviceid.replace(os.path.sep, '_')}"
                        f".configuration_{self.pid}_{self.seqnum}.bin")
            if os.path.isfile(filename):
                self.seqnum += 1
            else:
                break

        saveToFile(config, filename)
        params = [modname, classid, filename]

        with self.deviceInstanceMapLock:
            if deviceid in self.deviceInstanceMap:
                # Already there! Check whether previous process is alive:
                prevLauncher = self.deviceInstanceMap[deviceid]
                if prevLauncher.child.poll() is None:
                    # Process still up. Check Karabo communication by ping:
                    request = self.ss.request(deviceid, "slotPing", 1)
                    try:
                        # Too lazy to use async techniques for this corner case
                        request.waitForReply(3000)  # in milliseconds
                    except TimeoutError:
                        # Indeed dead Karabo-wise:
                        self.log.WARN("Kill previous incarnation of "
                                      f"'{deviceid}' since reply to "
                                      "ping timed out.")
                        prevLauncher.kill()
                    except Exception as e:
                        # Unexpected exception - give up
                        raise e
                    else:
                        # Technically, it could be alive on another server,
                        # but who cares about that detail...
                        # Just raising RuntimeError is in principle also OK
                        # Cannot call AsyncReply() directly in slot, so post:
                        EventLoop.post(lambda: reply(False,
                                                     f"{deviceid} already "
                                                     "instantiated and alive"))
                        return

            launcher = Launcher(params)
            launcher.start()
            self.deviceInstanceMap[deviceid] = launcher
            # Keep track of being starting to reply when succeeded (or timeout)
            self._startingDevices[deviceid] = (reply, time.time())
            if len(self._startingDevices) == 1:  # otherwise already running
                EventLoop.post(self._checkStartingDevices, 1)

    def _checkStartingDevices(self):
        failed = []
        somethingLeft = False
        with self.deviceInstanceMapLock:
            for devId, (reply, startedAt) in self._startingDevices.items():
                if (time.time() - startedAt) > self.instantiationTimeout:
                    failed.append((devId, reply))
            for devId, _ in failed:
                del self._startingDevices[devId]
                # But don't touch self.deviceInstanceMap - device may be "late"
            if self._startingDevices:
                somethingLeft = True

        # Reply without lock since sending messages is blocking:
        for devId, reply in failed:
            msg = (f"Timeout of instantiation: {devId} did not confirm it is "
                   f"up within {self.instantiationTimeout} seconds")
            self.log.WARN(msg)
            reply(False, msg)

        if somethingLeft:
            EventLoop.post(self._checkStartingDevices, 0.5)

    def slotKillServer(self):
        if self.log:
            self.log.INFO("Received kill signal")
        else:  # might get killed by signal handler before setting up logging
            print("Received kill signal")
        with self.deviceInstanceMapLock:
            # Break "loop" to check for starting devices
            self._startingDevices.clear()
            # Loop twice: First to quickly tell all devices to go down and then
            #             to wait until they are indeed down (or need killing)
            for deviceid in self.deviceInstanceMap:
                self.ss.call(deviceid, "slotKillDevice")
            for deviceid, launcher in self.deviceInstanceMap.items():
                if launcher:
                    try:
                        launcher.join()
                    except TimeoutExpired:
                        self.log.WARN("Timeout on server shutdown while"
                                      " stopping the process for '{}'"
                                      "... SIGKILL".format(deviceid))
                        launcher.kill()
            self.deviceInstanceMap = {}
        try:
            self.ss.reply(self.serverid)
        except Exception as e:
            if self.log:  # see above
                msg = ("Did not notify distributed system of server shutdown:"
                       "\n {}").format(e)
                self.log.ERROR(msg)
        finally:
            self.stopDeviceServer()

    def slotDeviceUp(self, instanceId, success, reason):
        "Slot for device to tell us whether it managed to get alive"
        awaited = None
        with self.deviceInstanceMapLock:
            awaited = self._startingDevices.pop(instanceId, None)
        if awaited:
            asyncReply = awaited[0]
            if success:
                asyncReply(success, instanceId)
            else:
                msg = (f"could not instantiate device {instanceId}."
                       f" Reason: {reason}")
                asyncReply(success, msg)
        else:
            self.log.WARN(f"Unexpected slotDeviceUp for {instanceId}")
            # No need to inform caller about failure via raising an exception

    def slotDeviceGone(self, instanceId):
        self.log.INFO("Device '{0}' notifies '{1.serverid}' about its future"
                      " death.".format(instanceId, self))
        with self.deviceInstanceMapLock:
            if instanceId in self.deviceInstanceMap:
                launcher = self.deviceInstanceMap[instanceId]
                if launcher:
                    try:
                        launcher.join()
                    except TimeoutExpired:
                        self.log.WARN("Timeout on device shutdown while"
                                      " stopping the process for '{}'"
                                      "... SIGKILL".format(instanceId))
                        launcher.kill()
                del self.deviceInstanceMap[instanceId]
            self.log.INFO("Device '{}' removed from server."
                          .format(instanceId))

    def slotGetClassSchema(self, classid):
        try:
            schema = Configurator(PythonDevice).getSchema(classid)
            self.ss.reply(schema, classid, self.serverid)
        except AttributeError as e:
            self.log.WARN(f"Replied empty schema due to : {str(e)}")
            self.ss.reply(Schema(), classid, self.serverid)

    def slotLoggerPriority(self, newprio):
        # In contrast to C++, the new priority will not be "forwarded" to
        # existing devices. Python devices have their own slotLoggerPriority
        # which allows priority setting device by device.
        # But future device instantiations should inherit their priority from
        # the current value of the server:
        oldprio = Logger.getPriority()
        Logger.setPriority(newprio)
        self.log.INFO(
            f"Logger Priority changed : {oldprio} ==> {newprio}")
        # Also devices started in future get the new priority by default:
        self.loggerParameters["priority"] = newprio
        # Merge the new log priority into the instanceInfo
        self.ss.updateInstanceInfo(Hash("log", newprio))

    def slotLoggerContent(self, info):
        """Slot call to receive logger content from the CacheLogger

        replies with a Hash containing a key, `serverId`
        and a `content` containing a vector of hashes formatted in the
        same way the broker based logging uses. For details:
        ``src/karabo/log/CacheLogger.cc``

        :param info: input Hash containing an optional `logs` integer
                     defining the maximum number of lines returned
        """
        # create a map where to store the devices' logs
        with self.deviceInstanceMapLock:
            log_map = dict.fromkeys(self.deviceInstanceMap)

        if not log_map:  # no devices => direct reply instead of an async one
            nMessages = info.get("logs", default=KARABO_LOGGER_CONTENT_DEFAULT)
            content = Logger.getCachedContent(nMessages)
            self.ss.reply(Hash("serverId", self.serverid, "content", content))
            return

        replyLock = threading.Lock()

        areply = self.ss.createAsyncReply()
        # copy because the info object will be cleaned up after
        # this function has returned
        info = copy.copy(info)

        class Handler:
            def __init__(s, deviceId):
                s.deviceId = deviceId

            def on_reply(s, reply):
                # copy because the reply object will be
                # cleaned up after the handler has returned
                _copy = [copy.copy(entry) for entry in reply['content']]
                log_map[s.deviceId] = _copy
                s.done()

            def on_error(s, msg, details):
                msg = (f"Missing Logger Content from '{s.deviceId}' : "
                       f"{msg} - DETAILS {details}")
                self.log.WARN(msg)
                log_map.pop(s.deviceId, None)
                s.done()

            def done(s):
                # make sure that only one handler will call
                # `_replySlotLoggerContent`.
                with replyLock:
                    if all(log_map.values()):
                        self._replySlotLoggerContent(info, log_map, areply)

        for deviceId in log_map.keys():
            # request the content from the devices in the server.
            req = self.ss.request(deviceId, "slotLoggerContent", info)
            h = Handler(deviceId)
            req.receiveAsync(h.on_reply, h.on_error, timeoutMs=5000)

    def _replySlotLoggerContent(self, info, log_map, areply):
        nMessages = info.get("logs", default=KARABO_LOGGER_CONTENT_DEFAULT)
        content = list(log_map.values())
        # add the latest server logs.
        content.append(Logger.getCachedContent(nMessages))
        content = chain.from_iterable(content)
        # sort by timestamp and get the last "nMessages" entries
        content = sorted(content, key=lambda entry: entry['timestamp'])
        # reply
        areply(Hash("serverId", self.serverid,
                    "content", content[-1 * nMessages:]))

    def _generateDefaultDeviceInstanceId(self, devClassId):
        cls = self.__class__
        with cls.instanceCountLock:
            if self.serverid not in cls.instanceCountPerDeviceServer:
                cls.instanceCountPerDeviceServer[self.serverid] = 0
            cls.instanceCountPerDeviceServer[self.serverid] += 1
            _index = cls.instanceCountPerDeviceServer[self.serverid]
            tokens = self.serverid.split("_")
            if tokens.pop() == str(os.getpid()):
                _domain = tokens.pop(0) + "-" + tokens.pop()
                _id = _domain + "_" + devClassId + "_" + str(_index)
            else:
                _id = self.serverid + "_" + devClassId + "_" + str(_index)
            return _id

__init__(config)

Constructor

Source code in src/pythonKarabo/karabo/bound/device_server.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def __init__(self, config):
    """Constructor"""
    if config is None:
        raise ValueError(
            "Input configuration for constructor should be Hash, not None")
    super().__init__()
    self.ss = self.log = None
    self.availableDevices = dict()
    self.deviceInstanceMap = dict()
    self._startingDevices = dict()  # needs protection by lock below
    self.deviceInstanceMapLock = threading.RLock()
    self.instantiationTimeout = config["instantiationTimeout"]
    if config.get('hostName') is not None:
        self.hostname = config['hostName']
    else:
        self.hostname = socket.gethostname().partition('.')[0]
    # Check if obsolete syntax still used...
    if config.has('autoStart'):
        raise KeyError(
            "'autoStart' syntax not supported anymore, use 'init'")
    self.autoStart = None
    if config.get("init") != "":
        asv = generateAutoStartHash(jsonToHash(config.get("init")))
        self.autoStart = asv['autoStart']

    self.deviceClasses = config.get("deviceClasses")
    self.pluginNamespace = config.get("pluginNamespace")
    self.timeServerId = config.get("timeServerId")

    if 'serverId' in config:
        self.serverid = config['serverId']
    else:
        self.serverid = self._generateDefaultServerId()

    self.connectionParameters = copy.copy(config['connection'])
    self.loggerParameters = None  # assemble in loadLogger
    self.pid = os.getpid()
    self.seqnum = 0

    # Start the logging system before the scanning of plugins, so any
    # error during plugin loading can be logged.
    self.loadLogger(config)
    self.log = Logger.getCategory(self.serverid)

    info = Hash("type", "server")
    info["serverId"] = self.serverid
    info["version"] = self.__class__.__version__
    info["host"] = self.hostname
    info["lang"] = "bound"
    info["log"] = config.get("Logger.priority")

    self.serverFlags = config.get("serverFlags")
    serverFlags = 0
    for flag in self.serverFlags:
        if flag in ServerFlags.__members__:
            serverFlags |= ServerFlags[flag]
        else:
            raise NotImplementedError(
                f"Provided serverFlag is not supported: {flag}")
    info["serverFlags"] = serverFlags

    devicesInfo, scanLogs = self.scanPlugins(self.pluginNamespace)
    info.merge(devicesInfo)

    signal.signal(signal.SIGINT, self.signal_handler)
    signal.signal(signal.SIGTERM, self.signal_handler)

    # Be aware: If signal handling is triggered before __init__ has
    # finished, self.ss might be set to None, causing problems below,
    # e.g. "'NoneType' object has no attribute 'getConnection'".
    # In worst case, SignalSlotable is constructed, but not yet assigned
    # to self.ss when the signal handler wants to reset it to None.
    # But the process stops nevertheless due to the EventLoop.stop().

    self.ss = SignalSlotable(self.serverid, self.connectionParameters,
                             config["heartbeatInterval"], info)

    # Register before self.ss.start(), i.e before sending instanceNew:
    self._registerAndConnectSignalsAndSlots()

    # Start SignalSlotable object - multithreading begins
    self.ss.start()

    # Now we can log the postponed logging messages - could have been
    # done directly after assigning self.log, but prefer to do after
    # instanceNew triggered by self.ss.start():
    for level, message in scanLogs:
        getattr(self.log, level)(message)

    msg = "DeviceServer starts on host '{0.hostname}' " \
          "with pid {0.pid}, broker: {1}"
    self.log.INFO(msg.format(self,
                             self.ss.getConnection().getBrokerUrl()))

    self.doAutoStart()

scanPlugins(pluginNamespace)

Scan for available device classes

Returns Hash with keys "deviceClasses" and "visibilities" to be merged into instance info and a list of log messages to be send. Inside the list there are tuples of two string: log level (e.g. "INFO") and message. Also fills self.availableDevices dictionary.

Source code in src/pythonKarabo/karabo/bound/device_server.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def scanPlugins(self, pluginNamespace):
    """Scan for available device classes

    Returns Hash with keys "deviceClasses" and "visibilities" to be merged
    into instance info and a list of log messages to be send.
    Inside the list there are tuples of two string: log level (e.g. "INFO")
    and message.
    Also fills self.availableDevices dictionary.
    """
    loaderCfg = Hash("pluginNamespace", pluginNamespace)
    loader = PluginLoader.create("PythonPluginLoader", loaderCfg)
    entrypoints = loader.update()
    logs = [("ERROR",
             f"scanPlugins: Cannot load device plugin {name} -- {exc}")
            for name, exc in loader.plugin_errors.items()]
    for ep in entrypoints:
        if not self.deviceClasses or ep.name in self.deviceClasses:
            # All entry points have been verified already
            deviceClass = ep.load()
            classid = deviceClass.__classid__
            try:
                schema = Configurator(PythonDevice).getSchema(classid)
                self.availableDevices[classid] = {"module": ep.name,
                                                  "schema": schema}
                logs.append(("INFO",
                             'Successfully loaded plugin: "{}"'.format(
                                 ep.name)))
            # A generic handler is used here due to the different kind
            # of exceptions that may be raised when obtained the schema
            # for the just loaded plugin.
            except Exception as e:
                m = "Failure while building schema for class {}, base " \
                    "class {} and bases {} : {}"\
                    .format(classid, deviceClass.__base_classid__,
                            deviceClass.__bases_classid__, repr(e))
                # repr(e) also includes type
                logs.append(("ERROR", m))

    instInfo = Hash("deviceClasses",
                    [classid for classid in self.availableDevices.keys()])

    return instInfo, logs

slotDeviceUp(instanceId, success, reason)

Slot for device to tell us whether it managed to get alive

Source code in src/pythonKarabo/karabo/bound/device_server.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
def slotDeviceUp(self, instanceId, success, reason):
    "Slot for device to tell us whether it managed to get alive"
    awaited = None
    with self.deviceInstanceMapLock:
        awaited = self._startingDevices.pop(instanceId, None)
    if awaited:
        asyncReply = awaited[0]
        if success:
            asyncReply(success, instanceId)
        else:
            msg = (f"could not instantiate device {instanceId}."
                   f" Reason: {reason}")
            asyncReply(success, msg)
    else:
        self.log.WARN(f"Unexpected slotDeviceUp for {instanceId}")

slotLoggerContent(info)

Slot call to receive logger content from the CacheLogger

replies with a Hash containing a key, serverId and a content containing a vector of hashes formatted in the same way the broker based logging uses. For details: src/karabo/log/CacheLogger.cc

:param info: input Hash containing an optional logs integer defining the maximum number of lines returned

Source code in src/pythonKarabo/karabo/bound/device_server.py
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
def slotLoggerContent(self, info):
    """Slot call to receive logger content from the CacheLogger

    replies with a Hash containing a key, `serverId`
    and a `content` containing a vector of hashes formatted in the
    same way the broker based logging uses. For details:
    ``src/karabo/log/CacheLogger.cc``

    :param info: input Hash containing an optional `logs` integer
                 defining the maximum number of lines returned
    """
    # create a map where to store the devices' logs
    with self.deviceInstanceMapLock:
        log_map = dict.fromkeys(self.deviceInstanceMap)

    if not log_map:  # no devices => direct reply instead of an async one
        nMessages = info.get("logs", default=KARABO_LOGGER_CONTENT_DEFAULT)
        content = Logger.getCachedContent(nMessages)
        self.ss.reply(Hash("serverId", self.serverid, "content", content))
        return

    replyLock = threading.Lock()

    areply = self.ss.createAsyncReply()
    # copy because the info object will be cleaned up after
    # this function has returned
    info = copy.copy(info)

    class Handler:
        def __init__(s, deviceId):
            s.deviceId = deviceId

        def on_reply(s, reply):
            # copy because the reply object will be
            # cleaned up after the handler has returned
            _copy = [copy.copy(entry) for entry in reply['content']]
            log_map[s.deviceId] = _copy
            s.done()

        def on_error(s, msg, details):
            msg = (f"Missing Logger Content from '{s.deviceId}' : "
                   f"{msg} - DETAILS {details}")
            self.log.WARN(msg)
            log_map.pop(s.deviceId, None)
            s.done()

        def done(s):
            # make sure that only one handler will call
            # `_replySlotLoggerContent`.
            with replyLock:
                if all(log_map.values()):
                    self._replySlotLoggerContent(info, log_map, areply)

    for deviceId in log_map.keys():
        # request the content from the devices in the server.
        req = self.ss.request(deviceId, "slotLoggerContent", info)
        h = Handler(deviceId)
        req.receiveAsync(h.on_reply, h.on_error, timeoutMs=5000)

Bases: DeviceClient

Handle certain device client responsibilities which interact with other Python code in Karabo.

Source code in src/pythonKarabo/karabo/bound/device_client.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class DeviceClient(BoundDeviceClient):
    """ Handle certain device client responsibilities which interact with other
    Python code in Karabo.
    """

    def get(self, instanceId, *args, **kw):
        """ Handle conversion of returned State and AlarmCondition objects."""
        value = super().get(instanceId, *args, **kw)

        if not isinstance(value, Hash):
            schema = self.getDeviceSchema(instanceId)
            key = args[0]
            if schema.isLeaf(key):
                if schema.hasClassId(key):
                    classId = schema.getClassId(key)
                    if classId == KARABO_CLASS_ID_STATE:
                        return State(value)
                    elif classId == KARABO_CLASS_ID_ALARM:
                        return AlarmCondition(value)

        return value

get(instanceId, *args, **kw)

Handle conversion of returned State and AlarmCondition objects.

Source code in src/pythonKarabo/karabo/bound/device_client.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def get(self, instanceId, *args, **kw):
    """ Handle conversion of returned State and AlarmCondition objects."""
    value = super().get(instanceId, *args, **kw)

    if not isinstance(value, Hash):
        schema = self.getDeviceSchema(instanceId)
        key = args[0]
        if schema.isLeaf(key):
            if schema.hasClassId(key):
                classId = schema.getClassId(key)
                if classId == KARABO_CLASS_ID_STATE:
                    return State(value)
                elif classId == KARABO_CLASS_ID_ALARM:
                    return AlarmCondition(value)

    return value

Worker

Bases: Thread

Source code in src/pythonKarabo/karabo/bound/worker.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
class Worker(threading.Thread):

    def __init__(self, callback=None, timeout=-1, repetition=-1, daemon=True):
        """Constructs the Worker thread, that is by default a daemon thread.

           Please note that daemon threads may not release resources
           properly when stopped abruptly.
        """
        threading.Thread.__init__(self, daemon=daemon)
        self.callback = callback
        self.onError = None
        self.onExit = None
        self.timeout = timeout
        self.repetition = repetition
        self.running = False
        self.aborted = False
        self.suspended = False
        self.counter = -1
        self.cv = threading.Condition()  # cv = condition variable
        self.dq = deque()

    def set(self, callback, timeout=-1, repetition=-1):
        self.callback = callback
        self.timeout = timeout
        self.repetition = repetition

    def setTimeout(self, timeout=-1):
        self.timeout = timeout

    def setRepetition(self, repetition=-1):
        self.repetition = repetition

    def setErrorHandler(self, handler):
        self.onError = handler
        return self

    def setExitHandler(self, handler):
        self.onExit = handler
        return self

    def is_running(self):
        return self.running

    def push(self, o):
        if self.running:
            with self.cv:
                self.dq.append(o)
                self.cv.notify()

    def isRepetitionCounterExpired(self):
        return self.counter == 0

    def run(self):
        self.running = True
        self.aborted = False
        self.suspended = False
        self.counter = self.repetition
        try:
            if not callable(self.callback):
                raise ValueError("No callback is registered in Worker")
            while not self.aborted:
                t = None
                if self.counter == 0:
                    if callable(self.onExit):
                        self.onExit()
                    break
                if not self.running:
                    break
                if self.suspended:
                    with self.cv:
                        while self.suspended:
                            self.cv.wait()
                        if self.aborted or not self.running:
                            break
                    continue
                if self.timeout < 0:
                    with self.cv:
                        while len(self.dq) == 0:
                            self.cv.wait()
                        if not self.suspended:
                            t = self.dq.popleft()
                elif self.timeout > 0:
                    with self.cv:
                        if len(self.dq) == 0:
                            # self.timeout in milliseconds
                            self.cv.wait(float(self.timeout) / 1000)
                        if len(self.dq) != 0 and not self.suspended:
                            t = self.dq.popleft()
                else:
                    with self.cv:
                        if len(self.dq) != 0 and not self.suspended:
                            t = self.dq.popleft()
                if self.suspended:
                    continue
                if t is not None:
                    if self.stopCondition(t):
                        if callable(self.onExit):
                            self.onExit()
                        break
                if self.counter > 0:
                    self.counter -= 1
                if self.running:
                    self.callback()
        except Exception:
            if callable(self.onError):
                self.onError(traceback.format_exc())
            else:
                traceback.print_exc()

        if self.running:
            self.running = False

    def stopCondition(self, obj):
        return False

    def start(self):
        if not self.running:
            self.suspended = False
            super().start()
        if self.suspended:
            with self.cv:
                self.suspended = False
                self.cv.notify()
        return self

    def stop(self):
        if self.running:
            with self.cv:
                self.running = False
                self.suspended = False
                self.cv.notify()
        return self

    def abort(self):
        self.aborted = True
        self.running = False
        if self.suspended:
            with self.cv:
                self.suspended = False
                self.cv.notify()
        if len(self.dq) != 0:
            with self.cv:
                self.dq.clear()
        return self

    def pause(self):
        if not self.suspended:
            with self.cv:
                self.suspended = True
                self.cv.notify()

__init__(callback=None, timeout=-1, repetition=-1, daemon=True)

Constructs the Worker thread, that is by default a daemon thread.

Please note that daemon threads may not release resources properly when stopped abruptly.

Source code in src/pythonKarabo/karabo/bound/worker.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(self, callback=None, timeout=-1, repetition=-1, daemon=True):
    """Constructs the Worker thread, that is by default a daemon thread.

       Please note that daemon threads may not release resources
       properly when stopped abruptly.
    """
    threading.Thread.__init__(self, daemon=daemon)
    self.callback = callback
    self.onError = None
    self.onExit = None
    self.timeout = timeout
    self.repetition = repetition
    self.running = False
    self.aborted = False
    self.suspended = False
    self.counter = -1
    self.cv = threading.Condition()  # cv = condition variable
    self.dq = deque()

KARABO_CLASSINFO(classid, version)

This decorator should be placed just before class definition. It adds some variables used in KARABO configuration infrastructure. It has two parameters: "classId" and "version" similar to corresponding C++ macro: Example: @KARABO_CLASSINFO("Shape","1.0") class Shape(object): ...

Source code in src/pythonKarabo/karabo/bound/decorators.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def KARABO_CLASSINFO(classid, version):
    """
    This decorator should be placed just before class definition. It adds some
    variables used in KARABO configuration infrastructure. It has two
    parameters: "classId" and "version" similar to corresponding C++ macro:
    Example:
            @KARABO_CLASSINFO("Shape","1.0")
            class Shape(object):
                ...
    """
    def realDecorator(theClass):
        if isinstance(theClass, type):
            theClass.__classid__ = str(classid)
            theClass.__version__ = version
            if hasattr(theClass, "__base_classid__"):
                Configurator(theClass.__base_classid__).registerClass(theClass)
                theClass.__bases_classid__ = []
                for base in theClass.__bases__:
                    for i in getattr(base, '__bases_classid__', ()):
                        if i not in theClass.__bases_classid__:
                            theClass.__bases_classid__.append(i)
                theClass.__bases_classid__ = tuple(theClass.__bases_classid__)
        return theClass
    return realDecorator

KARABO_CONFIGURATION_BASE_CLASS(theClass)

This decorator should be placed just before "KARABO_CLASSINFO" decorator. It registers the class as the base configurable class and adds the following classmethods: "create", "createNode", "createChoice", "createList", "getSchema" and "getRegisteredClasses". It has no parameters. Example: @KARABO_CONFIGURATION_BASE_CLASS @KARABO_CLASSINFO("Shape","1.0") class Shape(object): ...

Source code in src/pythonKarabo/karabo/bound/decorators.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def KARABO_CONFIGURATION_BASE_CLASS(theClass):
    """
    This decorator should be placed just before "KARABO_CLASSINFO" decorator.
    It registers the class as the base configurable class and adds the
    following classmethods: "create", "createNode", "createChoice",
    "createList", "getSchema" and "getRegisteredClasses". It has no parameters.
    Example:
            @KARABO_CONFIGURATION_BASE_CLASS
            @KARABO_CLASSINFO("Shape","1.0")
            class Shape(object):
                ...
    """
    if isinstance(theClass, type):
        Configurator.registerAsBaseClass(theClass)

        def create(cls, *args):
            """
            The factory classmethod to create the instance of class with
            "classId" using input "configuration".
            Example:
                instance = Shape.create("EditableCircle",
                                        Hash("radius", 12.345))

            The factory classmethod to create instance according input
            "configuration".  The configuration should have "classId"
            of class to be created as a root element.
            Example:
                    configuration = Hash("EditableCircle.radius", 12.345)
                    instance = Shape.create(configuration)
            """
            if len(args) == 1:
                return Configurator(cls.__base_classid__).create(args[0])
            elif len(args) == 2:
                return Configurator(cls.__base_classid__).create(args[0],
                                                                 args[1])
            elif len(args) == 3:
                return Configurator(cls.__base_classid__).create(args[0],
                                                                 args[1],
                                                                 args[2])
            else:
                raise TypeError("Wrong number of arguments or their types")

        create = classmethod(create)
        theClass.create = create

        def createNode(cls, nodename, classid, configuration):
            """
            The helper classmethod to create instance of class specified by
            "classId" using sub-configuration specified by "nodeName" which has
            to be a part of input "configuration".
            """
            return Configurator(cls.__base_classid__).createNode(nodename,
                                                                 classid,
                                                                 configuration)

        createNode = classmethod(createNode)
        theClass.createNode = createNode

        def createChoice(cls, choice, config):
            """
            The helper classmethod to create the instance using "choiceName"
            and input "configuration".
            """
            return Configurator(cls.__base_classid__).createChoice(choice,
                                                                   config)

        createChoice = classmethod(createChoice)
        theClass.createChoice = createChoice

        def createList(cls, listname, configuration):
            """
            The helper method to create the list of instances using "listName"
            as a key to the list of configs in input "configuration".
            The configurations will be validated.
            """
            return Configurator(cls.__base_classid__).createList(listname,
                                                                 configuration)

        createList = classmethod(createList)
        theClass.createList = createList

        def getSchema(cls, classid, rules=None):
            """
            Use this classmethod to get schema for class with "classid" using
            assembly "rules"
            Example:
                    schema = Shape.getSchema("Rectangle")
                or
                    schema = Shape.getSchema("Rectangle", AssemblyRules())
            """
            return Configurator(cls.__base_classid__).getSchema(classid, rules)

        getSchema = classmethod(getSchema)
        theClass.getSchema = getSchema

        def getRegisteredClasses(cls):
            return Configurator(cls.__base_classid__).getRegisteredClasses()

        getRegisteredClasses = classmethod(getRegisteredClasses)
        theClass.getRegisteredClasses = getRegisteredClasses

    return theClass

Provides factorized configuration

Configurator is the singleton class that keeps methods for registration and creation other classes that are configurable classes

Source code in src/pythonKarabo/karabo/bound/configurator.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
class Configurator:
    """Provides factorized configuration

    Configurator is the singleton class that keeps methods for registration and
    creation other classes that are configurable classes
    """
    _instance = None
    registry = {}

    @staticmethod
    def registerAsBaseClass(theClass):
        """Register a class as a base class in the configurator

        :param theClass: to be registered
        """
        if theClass.__classid__ not in Configurator.registry:
            theClass.__base_classid__ = theClass.__classid__
            theClass.__bases_classid__ = (theClass.__classid__,)
            Configurator.registry[theClass.__classid__] = {}
        # self-registering
        shrt = theClass.__classid__
        Configurator.registry[shrt][shrt] = theClass

    def __init__(self, classid):
        """
        The argument to the constructor may be the classid of a configurable
        class or configurable class itself:
        Example:
                c = Configurator(ConfigurableClass)
        or
                c = Configurator("ConfigurableClassId")
        """
        if isinstance(classid, type):
            classid = classid.__classid__
        if not isinstance(classid, str):
            raise TypeError("The argument type '{}' is not allowed. "
                            "Must be a class or a str.".format(type(classid)))
        if classid not in Configurator.registry:
            raise AttributeError(
                "Argument is not a class or classid of registered base class")
        self.baseRegistry = Configurator.registry[classid]
        assert classid in self.baseRegistry

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super().__new__(cls)
        return cls._instance

    def registerClass(self, derived):
        """
        Register a derived class, i.e. a class deriving from a registered
        base class in the configuration system.
        :param derived:
        :return:
        """
        self.baseRegistry[derived.__classid__] = derived
        return self

    def getSchema(self, classid, rules=AssemblyRules()):
        """
        Get schema for class with "classid" derived from base class given to
        constructor using assembly "rules".
        Example:
                schema = Configurator(Shape).getSchema("Rectangle")
        """
        if rules is None:
            rules = AssemblyRules()
        if isinstance(classid, type):
            classid = classid.__classid__
        if not isinstance(classid, str):
            raise TypeError("The first argument type '{}' is not allowed. "
                            "Must be a type or str.".format(type(classid)))
        if classid not in self.baseRegistry:
            raise AttributeError("Class Id '{}' not found in the base registry"
                                 .format(classid))
        Derived = self.baseRegistry[classid]  # userclass -> Derived

        # building list of classes in inheritance order from bases to the
        # last derived
        def inheritanceChain(c, bases_id, clist):
            if not isinstance(c, type):
                return
            classId = getattr(c, '__classid__', None)
            if classId is not None and classId not in bases_id:
                for x in c.__bases__:
                    inheritanceChain(x, bases_id, clist)
            if c not in clist:
                clist.append(c)

        clist = []
        inheritanceChain(Derived, Derived.__bases_classid__, clist)
        # clist contains list of classes in inheritance order
        schema = Schema(classid, rules)
        for theClass in clist:
            if hasattr(theClass, "expectedParameters"):
                # fill schema in order from base to derived
                theClass.expectedParameters(schema)

        return schema

    def create(self, *args):
        """
        The factory method to create the instance of class with "classId" that
        inherits from base class given to constructor using "input"
        configuration.
        The last argument is a flag to determine if the input configuration
        should be validated.

        Example:

                instance = Configurator(Shape).create("EditableCircle",
                                                      Hash("radius", 12.345))

        The factory method to create instance of class that inherits from base
        class given to constructor using input "configuration".
        The configuration should have "classId" of class to be created as a
        root element.  The last argument is a flag to determine if the input
        "configuration" should be validated.

        Example:

                configuration = Hash("EditableCircle.radius", 12.345)
                instance = Configurator(Shape).create(configuration)

        :param args: thus can have an arity of one to three:
                    - class id string
                    - config with classId as root key
                    - class id string, config
                    - config with classId as root key, validation flag
                    - class id string, config, validation flag

        """

        if len(args) == 3:
            classid = args[0]
            configuration = args[1]
            validation = args[2]
        elif len(args) == 2:
            if isinstance(args[1], bool):
                configuration = args[0]
                validation = args[1]
                classid = list(configuration.keys())[0]
                configuration = configuration[classid]
            else:
                classid = args[0]
                configuration = args[1]
                validation = True
        elif len(args) == 1:
            configuration = args[0]
            validation = True
            classid = list(configuration.keys())[0]
            configuration = configuration[classid]
        else:
            raise TypeError("Wrong number of arguments and/or their types")
        if isinstance(classid, type):
            classid = classid.__classid__
        if not isinstance(classid, str):
            raise TypeError("First argument 'classid' must be a python"
                            " string type")
        if classid not in self.baseRegistry:
            raise AttributeError("Unknown classid '{}' in base registry"
                                 .format(classid))
        Derived = self.baseRegistry[classid]

        schema = Configurator(Derived.__base_classid__).getSchema(
            classid, AssemblyRules())
        if not validation:
            return Derived(configuration)
        validator = Validator()
        result, error, validated = validator.validate(schema, configuration)
        if not result:
            raise RuntimeError(f"Validation Exception: {error}")
        return Derived(validated)

    def createNode(self, nodename, classid, configuration, validation=True):
        """
        The helper method to create instance of class specified by "classId"
        and derived from class given to constructor using sub-configuration
        specified by "nodeName" which has to be a part of input
        "configuration".

        The last argument is a flag to determine if the input "configuration"
        should be validated.
        """

        if isinstance(classid, type):
            classid = classid.__classid__
        if not isinstance(classid, str):
            raise TypeError("Second argument 'classid' must be a python"
                            " string type")
        if nodename in configuration:
            return self.create(classid, configuration[nodename], validation)
        raise AttributeError('Given nodeName "{}" is not part of input'
                             ' configuration'.format(nodename))

    def createChoice(self, choicename, configuration, validation=True):
        """
        The helper method to create the instance of class derived from base
        class given to constructor using "choiceName" and
        input "configuration".  The last argument is a flag to determine if
        the input configuration should be validated.
        """

        return self.create(configuration[choicename], validation)

    def createList(self, listname, input, validation=True):
        """
        The helper method to create the list of instances of classes derived
        from base class given to constructor using "listName"
        used as a key to the list and "input" configuration.  The last argument
        is a flag to determine if the input configuration
        should be validated.
        """

        if listname not in input:
            raise AttributeError('Given list name "{}" is not a part of input'
                                 ' configuration'.format(listname))
        instances = []
        for hash in input[listname]:
            instances.append(self.create(hash, validation))
        return instances

    def getRegisteredClasses(self):
        """
        Returns list of "classid"'s for all registered classes derived from
        base class given to constructor.
        """
        return list(self.baseRegistry.keys())

    @staticmethod
    def getRegisteredBaseClasses():
        """
        Returns all classid's of base classes registered in Configurator.
        """
        return list(Configurator.registry.keys())

__init__(classid)

The argument to the constructor may be the classid of a configurable class or configurable class itself: Example: c = Configurator(ConfigurableClass) or c = Configurator("ConfigurableClassId")

Source code in src/pythonKarabo/karabo/bound/configurator.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def __init__(self, classid):
    """
    The argument to the constructor may be the classid of a configurable
    class or configurable class itself:
    Example:
            c = Configurator(ConfigurableClass)
    or
            c = Configurator("ConfigurableClassId")
    """
    if isinstance(classid, type):
        classid = classid.__classid__
    if not isinstance(classid, str):
        raise TypeError("The argument type '{}' is not allowed. "
                        "Must be a class or a str.".format(type(classid)))
    if classid not in Configurator.registry:
        raise AttributeError(
            "Argument is not a class or classid of registered base class")
    self.baseRegistry = Configurator.registry[classid]
    assert classid in self.baseRegistry

create(*args)

The factory method to create the instance of class with "classId" that inherits from base class given to constructor using "input" configuration. The last argument is a flag to determine if the input configuration should be validated.

Example:

    instance = Configurator(Shape).create("EditableCircle",
                                          Hash("radius", 12.345))

The factory method to create instance of class that inherits from base class given to constructor using input "configuration". The configuration should have "classId" of class to be created as a root element. The last argument is a flag to determine if the input "configuration" should be validated.

Example:

    configuration = Hash("EditableCircle.radius", 12.345)
    instance = Configurator(Shape).create(configuration)

:param args: thus can have an arity of one to three: - class id string - config with classId as root key - class id string, config - config with classId as root key, validation flag - class id string, config, validation flag

Source code in src/pythonKarabo/karabo/bound/configurator.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def create(self, *args):
    """
    The factory method to create the instance of class with "classId" that
    inherits from base class given to constructor using "input"
    configuration.
    The last argument is a flag to determine if the input configuration
    should be validated.

    Example:

            instance = Configurator(Shape).create("EditableCircle",
                                                  Hash("radius", 12.345))

    The factory method to create instance of class that inherits from base
    class given to constructor using input "configuration".
    The configuration should have "classId" of class to be created as a
    root element.  The last argument is a flag to determine if the input
    "configuration" should be validated.

    Example:

            configuration = Hash("EditableCircle.radius", 12.345)
            instance = Configurator(Shape).create(configuration)

    :param args: thus can have an arity of one to three:
                - class id string
                - config with classId as root key
                - class id string, config
                - config with classId as root key, validation flag
                - class id string, config, validation flag

    """

    if len(args) == 3:
        classid = args[0]
        configuration = args[1]
        validation = args[2]
    elif len(args) == 2:
        if isinstance(args[1], bool):
            configuration = args[0]
            validation = args[1]
            classid = list(configuration.keys())[0]
            configuration = configuration[classid]
        else:
            classid = args[0]
            configuration = args[1]
            validation = True
    elif len(args) == 1:
        configuration = args[0]
        validation = True
        classid = list(configuration.keys())[0]
        configuration = configuration[classid]
    else:
        raise TypeError("Wrong number of arguments and/or their types")
    if isinstance(classid, type):
        classid = classid.__classid__
    if not isinstance(classid, str):
        raise TypeError("First argument 'classid' must be a python"
                        " string type")
    if classid not in self.baseRegistry:
        raise AttributeError("Unknown classid '{}' in base registry"
                             .format(classid))
    Derived = self.baseRegistry[classid]

    schema = Configurator(Derived.__base_classid__).getSchema(
        classid, AssemblyRules())
    if not validation:
        return Derived(configuration)
    validator = Validator()
    result, error, validated = validator.validate(schema, configuration)
    if not result:
        raise RuntimeError(f"Validation Exception: {error}")
    return Derived(validated)

createChoice(choicename, configuration, validation=True)

The helper method to create the instance of class derived from base class given to constructor using "choiceName" and input "configuration". The last argument is a flag to determine if the input configuration should be validated.

Source code in src/pythonKarabo/karabo/bound/configurator.py
217
218
219
220
221
222
223
224
225
def createChoice(self, choicename, configuration, validation=True):
    """
    The helper method to create the instance of class derived from base
    class given to constructor using "choiceName" and
    input "configuration".  The last argument is a flag to determine if
    the input configuration should be validated.
    """

    return self.create(configuration[choicename], validation)

createList(listname, input, validation=True)

The helper method to create the list of instances of classes derived from base class given to constructor using "listName" used as a key to the list and "input" configuration. The last argument is a flag to determine if the input configuration should be validated.

Source code in src/pythonKarabo/karabo/bound/configurator.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def createList(self, listname, input, validation=True):
    """
    The helper method to create the list of instances of classes derived
    from base class given to constructor using "listName"
    used as a key to the list and "input" configuration.  The last argument
    is a flag to determine if the input configuration
    should be validated.
    """

    if listname not in input:
        raise AttributeError('Given list name "{}" is not a part of input'
                             ' configuration'.format(listname))
    instances = []
    for hash in input[listname]:
        instances.append(self.create(hash, validation))
    return instances

createNode(nodename, classid, configuration, validation=True)

The helper method to create instance of class specified by "classId" and derived from class given to constructor using sub-configuration specified by "nodeName" which has to be a part of input "configuration".

The last argument is a flag to determine if the input "configuration" should be validated.

Source code in src/pythonKarabo/karabo/bound/configurator.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def createNode(self, nodename, classid, configuration, validation=True):
    """
    The helper method to create instance of class specified by "classId"
    and derived from class given to constructor using sub-configuration
    specified by "nodeName" which has to be a part of input
    "configuration".

    The last argument is a flag to determine if the input "configuration"
    should be validated.
    """

    if isinstance(classid, type):
        classid = classid.__classid__
    if not isinstance(classid, str):
        raise TypeError("Second argument 'classid' must be a python"
                        " string type")
    if nodename in configuration:
        return self.create(classid, configuration[nodename], validation)
    raise AttributeError('Given nodeName "{}" is not part of input'
                         ' configuration'.format(nodename))

getRegisteredBaseClasses() staticmethod

Returns all classid's of base classes registered in Configurator.

Source code in src/pythonKarabo/karabo/bound/configurator.py
251
252
253
254
255
256
@staticmethod
def getRegisteredBaseClasses():
    """
    Returns all classid's of base classes registered in Configurator.
    """
    return list(Configurator.registry.keys())

getRegisteredClasses()

Returns list of "classid"'s for all registered classes derived from base class given to constructor.

Source code in src/pythonKarabo/karabo/bound/configurator.py
244
245
246
247
248
249
def getRegisteredClasses(self):
    """
    Returns list of "classid"'s for all registered classes derived from
    base class given to constructor.
    """
    return list(self.baseRegistry.keys())

getSchema(classid, rules=AssemblyRules())

Get schema for class with "classid" derived from base class given to constructor using assembly "rules". Example: schema = Configurator(Shape).getSchema("Rectangle")

Source code in src/pythonKarabo/karabo/bound/configurator.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def getSchema(self, classid, rules=AssemblyRules()):
    """
    Get schema for class with "classid" derived from base class given to
    constructor using assembly "rules".
    Example:
            schema = Configurator(Shape).getSchema("Rectangle")
    """
    if rules is None:
        rules = AssemblyRules()
    if isinstance(classid, type):
        classid = classid.__classid__
    if not isinstance(classid, str):
        raise TypeError("The first argument type '{}' is not allowed. "
                        "Must be a type or str.".format(type(classid)))
    if classid not in self.baseRegistry:
        raise AttributeError("Class Id '{}' not found in the base registry"
                             .format(classid))
    Derived = self.baseRegistry[classid]  # userclass -> Derived

    # building list of classes in inheritance order from bases to the
    # last derived
    def inheritanceChain(c, bases_id, clist):
        if not isinstance(c, type):
            return
        classId = getattr(c, '__classid__', None)
        if classId is not None and classId not in bases_id:
            for x in c.__bases__:
                inheritanceChain(x, bases_id, clist)
        if c not in clist:
            clist.append(c)

    clist = []
    inheritanceChain(Derived, Derived.__bases_classid__, clist)
    # clist contains list of classes in inheritance order
    schema = Schema(classid, rules)
    for theClass in clist:
        if hasattr(theClass, "expectedParameters"):
            # fill schema in order from base to derived
            theClass.expectedParameters(schema)

    return schema

registerAsBaseClass(theClass) staticmethod

Register a class as a base class in the configurator

:param theClass: to be registered

Source code in src/pythonKarabo/karabo/bound/configurator.py
31
32
33
34
35
36
37
38
39
40
41
42
43
@staticmethod
def registerAsBaseClass(theClass):
    """Register a class as a base class in the configurator

    :param theClass: to be registered
    """
    if theClass.__classid__ not in Configurator.registry:
        theClass.__base_classid__ = theClass.__classid__
        theClass.__bases_classid__ = (theClass.__classid__,)
        Configurator.registry[theClass.__classid__] = {}
    # self-registering
    shrt = theClass.__classid__
    Configurator.registry[shrt][shrt] = theClass

registerClass(derived)

Register a derived class, i.e. a class deriving from a registered base class in the configuration system. :param derived: :return:

Source code in src/pythonKarabo/karabo/bound/configurator.py
70
71
72
73
74
75
76
77
78
def registerClass(self, derived):
    """
    Register a derived class, i.e. a class deriving from a registered
    base class in the configuration system.
    :param derived:
    :return:
    """
    self.baseRegistry[derived.__classid__] = derived
    return self