1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
|
import collections
import contextlib
import dataclasses
import datetime
import os
import time
from collections import defaultdict, Counter
from dataclasses import fields
from enum import IntEnum
from functools import lru_cache
from typing import (
List,
Optional,
TYPE_CHECKING,
cast,
FrozenSet,
Self,
Set,
Dict,
Tuple,
TypeVar,
)
from collections.abc import (
Callable,
Mapping,
Sequence,
Iterable,
Awaitable,
AsyncIterable,
)
import debputy.l10n as l10n
from debputy.commands.debputy_cmd.output import IOBasedOutputStyling
from debputy.dh.dh_assistant import (
extract_dh_addons_from_control,
DhSequencerData,
parse_drules_for_addons,
)
from debputy.exceptions import PureVirtualPathError, DebputyRuntimeError
from debputy.filesystem_scan import VirtualPathBase
from debputy.integration_detection import determine_debputy_integration_mode
from debputy.l10n import Translations
from debputy.lsp.config.debputy_config import DebputyConfig
from debputy.lsp.diagnostics import (
LintSeverity,
LINT_SEVERITY2LSP_SEVERITY,
DiagnosticData,
NATIVELY_LSP_SUPPORTED_SEVERITIES,
)
from debputy.lsp.spellchecking import Spellchecker, default_spellchecker
from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file
from debputy.packages import SourcePackage, BinaryPackage
from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
from debputy.plugin.api.spec import DebputyIntegrationMode
from debputy.util import _warn, T
from debputy.lsp.vendoring._deb822_repro.locatable import (
Range as TERange,
Position as TEPosition,
Locatable,
START_POSITION,
)
if TYPE_CHECKING:
import lsprotocol.types as types
from debputy.lsp.text_util import LintCapablePositionCodec
from debputy.lsp.maint_prefs import (
MaintainerPreferenceTable,
EffectiveFormattingPreference,
)
else:
import debputy.lsprotocol.types as types
L = TypeVar("L", bound=Locatable)
AsyncLinterImpl = Callable[["LintState"], Awaitable[None]]
FormatterImpl = Callable[["LintState"], Optional[Sequence[types.TextEdit]]]
class AbortTaskError(DebputyRuntimeError):
pass
# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic`
DIAG_SOURCE_WITHOUT_SECTIONS: frozenset[str] = frozenset(
{
"debputy",
"dpkg",
}
)
# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic`
DIAG_SOURCE_WITH_SECTIONS: frozenset[str] = frozenset(
{
"Policy",
"DevRef",
}
)
def te_position_to_lsp(te_position: "TEPosition") -> types.Position:
return types.Position(
te_position.line_position,
te_position.cursor_position,
)
def te_range_to_lsp(te_range: "TERange") -> types.Range:
return types.Range(
te_position_to_lsp(te_range.start_pos),
te_position_to_lsp(te_range.end_pos),
)
def with_range_in_continuous_parts(
iterable: Iterable["L"],
*,
start_relative_to: "TEPosition" = START_POSITION,
) -> Iterable[tuple["TERange", "L"]]:
current_pos = start_relative_to
for part in iterable:
part_range = part.size().relative_to(current_pos)
yield part_range, part
current_pos = part_range.end_pos
@lru_cache
def _check_diagnostic_source(source: str) -> None:
if source in DIAG_SOURCE_WITHOUT_SECTIONS:
return
parts = source.split(" ")
s = parts[0]
if s not in DIAG_SOURCE_WITH_SECTIONS:
raise ValueError(
f'Unknown diagnostic source: "{source}". If you are adding a new source, update lint_util.py'
)
if len(parts) != 2:
raise ValueError(
f'The diagnostic source "{source}" should have exactly one section associated with it.'
)
@dataclasses.dataclass(slots=True)
class DebputyMetadata:
debputy_integration_mode: DebputyIntegrationMode | None
@classmethod
def from_data(
cls,
source_fields: Mapping[str, str],
dh_sequencer_data: DhSequencerData,
) -> Self:
integration_mode = determine_debputy_integration_mode(
source_fields,
dh_sequencer_data.sequences,
)
return cls(integration_mode)
@dataclasses.dataclass(slots=True, frozen=True)
class RelatedDiagnosticInformation:
text_range: "TERange"
message: str
doc_uri: str
def to_lsp(self, lint_state: "LintState") -> types.DiagnosticRelatedInformation:
return types.DiagnosticRelatedInformation(
types.Location(
self.doc_uri,
lint_state.position_codec.range_to_client_units(
lint_state.lines,
te_range_to_lsp(self.text_range),
),
),
self.message,
)
@dataclasses.dataclass(frozen=True, slots=True)
class WorkspaceTextEditSupport:
supports_document_changes: bool = False
supported_resource_operation_edit_kinds: Sequence[types.ResourceOperationKind] = (
dataclasses.field(default_factory=list)
)
@property
def supports_versioned_text_edits(self) -> bool:
return (
self.supports_document_changes
or self.supported_resource_operation_edit_kinds
)
class LintState:
@property
def plugin_feature_set(self) -> PluginProvidedFeatureSet:
"""The plugin features known to the running instance of `debputy`
This is mostly only relevant when working with `debputy.manifest`
"""
raise NotImplementedError
@property
def doc_uri(self) -> str:
"""The URI for the document being scanned.
This can be useful for providing related location ranges.
"""
raise NotImplementedError
@property
def doc_version(self) -> int | None:
raise NotImplementedError
@property
def source_root(self) -> VirtualPathBase | None:
"""The path to the unpacked source root directory if available
This is the directory that would contain the `debian/` directory. Note, if you need the
`debian/` directory, please use `debian_dir` instead. There may be cases where the source
root is unavailable but the `debian/` directory is not.
"""
raise NotImplementedError
@property
def debian_dir(self) -> VirtualPathBase | None:
"""The path to the `debian/` directory if available"""
raise NotImplementedError
@property
def path(self) -> str:
"""The filename or path of the file being scanned.
Note this path may or may not be accessible to the running `debputy` instance. Nor is it guaranteed
that the file on the file system (even if accessible) has correct contents. When doing diagnostics
for an editor, the editor often requests diagnostics for unsaved changes.
"""
raise NotImplementedError
@property
def content(self) -> str:
"""The full contents of the file being checked"""
raise NotImplementedError
@property
def lines(self) -> list[str]:
# FIXME: Replace with `Sequence[str]` if possible
"""The contents of the file being checked as a list of lines
Do **not** change the contents of this list as it may be cached.
"""
raise NotImplementedError
@property
def position_codec(self) -> "LintCapablePositionCodec":
raise NotImplementedError
@property
def parsed_deb822_file_content(self) -> Deb822FileElement | None:
"""The contents of the file being checked as a parsed deb822 file
This can sometimes use a cached version of the parsed file and is therefore preferable to
parsing the file manually from `content` or `lines`.
Do **not** change the contents of this as it may be cached.
"""
raise NotImplementedError
@property
def source_package(self) -> SourcePackage | None:
"""The source package (source stanza of `debian/control`).
Will be `None` if the `debian/control` file cannot be parsed as a deb822 file, or if the
source stanza is not available.
"""
raise NotImplementedError
@property
def binary_packages(self) -> Mapping[str, BinaryPackage] | None:
"""The binary packages (the Package stanzas of `debian/control`).
Will be `None` if the `debian/control` file cannot be parsed, or if no Package stanzas are
available.
"""
raise NotImplementedError
@property
def maint_preference_table(self) -> "MaintainerPreferenceTable":
# TODO: Visible only for tests.
raise NotImplementedError
@property
def effective_preference(self) -> Optional["EffectiveFormattingPreference"]:
raise NotImplementedError
@property
def debputy_metadata(self) -> DebputyMetadata:
"""Information about `debputy` usage such as which integration mode is being used."""
src_pkg = self.source_package
src_fields = src_pkg.fields if src_pkg else {}
return DebputyMetadata.from_data(
src_fields,
self.dh_sequencer_data,
)
@property
def dh_sequencer_data(self) -> DhSequencerData:
"""Information about the use of the `dh` sequencer
This includes which sequences are being used and whether the `dh` sequencer is used at all.
"""
raise NotImplementedError
@property
def workspace_text_edit_support(self) -> WorkspaceTextEditSupport:
raise NotImplementedError
@property
def debputy_config(self) -> DebputyConfig:
raise NotImplementedError
def spellchecker(self) -> "Spellchecker":
checker = default_spellchecker()
ignored_words = set()
source_package = self.source_package
binary_packages = self.binary_packages
if source_package and (name := source_package.fields.get("Source")) is not None:
ignored_words.add(name)
if binary_packages:
ignored_words.update(binary_packages.keys())
return checker.context_ignored_words(ignored_words)
async def slow_iter(
self,
iterable: Iterable[T],
*,
yield_every: int = 100,
) -> AsyncIterable[T]:
for value in iterable:
yield value
def translation(self, domain: str) -> Translations:
return l10n.translation(
domain,
)
def related_diagnostic_information(
self,
text_range: "TERange",
message: str,
*,
doc_uri: str | None = None,
) -> RelatedDiagnosticInformation:
"""Provide a related context for the diagnostic
The related diagnostic information is typically highlighted with the diagnostic. As an example,
`debputy lint`'s terminal output will display the message and display the selected range after
the diagnostic itself.
:param text_range: The text range to highlight.
:param message: The message to associate with the provided text range.
:param doc_uri: The URI of the document that the context is from. When omitted, the text range is
assumed to be from the "current" file (the `doc_uri` attribute), which is also the default file
for ranges passed to `emit_diagnostic`.
:return:
"""
return RelatedDiagnosticInformation(
text_range,
message,
doc_uri=doc_uri if doc_uri is not None else self.doc_uri,
)
def emit_diagnostic(
self,
text_range: "TERange",
diagnostic_msg: str,
severity: LintSeverity,
authority_reference: str,
*,
quickfixes: list[dict] | None = None,
tags: list[types.DiagnosticTag] | None = None,
related_information: list[RelatedDiagnosticInformation] | None = None,
diagnostic_applies_to_another_file: str | None = None,
enable_non_interactive_auto_fix: bool = True,
) -> None:
"""Emit a diagnostic for an issue detected in the current file.
:param text_range: The text range to highlight in the file.
:param diagnostic_msg: The message to show to the user for this diagnostic
:param severity: The severity to associate with the diagnostic.
:param authority_reference: A reference to the authority / guide that this diagnostic is a violation of.
Use:
* "Policy 3.4.1" for Debian Policy Manual section 3.4.1
(replace the section number with the relevant number for your case)
* "DevRef 6.2.2" for the Debian Developer Reference section 6.2.2
(replace the section number with the relevant number for your case)
* "debputy" for diagnostics without a reference or where `debputy` is the authority.
(This is also used for cases where `debputy` filters the result. Like with spellchecking
via hunspell, where `debputy` provides its own ignore list on top)
If you need a new reference, feel free to add it to this list.
:param quickfixes: If provided, this is a list of possible fixes for this problem.
Use the quickfixes provided in `debputy.lsp.quickfixes` such as `propose_correct_text_quick_fix`.
:param tags: TODO: Not yet specified (currently uses LSP format).
:param related_information: Provide additional context to the diagnostic. This can be used to define
the source of a conflict. As an example, for duplicate definitions, this can be used to show where
the definitions are.
Every item should be created via the `related_diagnostic_information` method.
:param enable_non_interactive_auto_fix: Allow non-interactive auto-fixing (such as via
`debputy lint --auto-fix`) of this issue. Set to `False` if the check is likely to have false
positives.
:param diagnostic_applies_to_another_file: Special-case parameter for flagging invalid file names.
Leave this one at `None`, unless you know you need it.
It has non-obvious semantics and is primarily useful for reporting typos of filenames such as
`debian/install`, etc.
"""
_check_diagnostic_source(authority_reference)
lsp_severity = LINT_SEVERITY2LSP_SEVERITY[severity]
diag_data: DiagnosticData = {
"enable_non_interactive_auto_fix": enable_non_interactive_auto_fix,
}
if severity not in NATIVELY_LSP_SUPPORTED_SEVERITIES:
diag_data["lint_severity"] = severity
if quickfixes:
diag_data["quickfixes"] = quickfixes
if diagnostic_applies_to_another_file is not None:
diag_data["report_for_related_file"] = diagnostic_applies_to_another_file
lsp_range_client_units = self.position_codec.range_to_client_units(
self.lines,
te_range_to_lsp(text_range),
)
if related_information and any(
i.doc_uri != self.doc_uri for i in related_information
):
raise NotImplementedError("Ranges from another document will be wrong")
related_lsp_format = (
[i.to_lsp(self) for i in related_information]
if related_information
else None
)
diag = types.Diagnostic(
lsp_range_client_units,
diagnostic_msg,
severity=lsp_severity,
source=authority_reference,
data=diag_data if diag_data else None,
tags=tags,
related_information=related_lsp_format,
)
self._emit_diagnostic(diag)
def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None:
raise NotImplementedError
CLI_WORKSPACE_TEXT_EDIT_SUPPORT = WorkspaceTextEditSupport(
supports_document_changes=True,
)
@dataclasses.dataclass(slots=True)
class LintStateImpl(LintState):
plugin_feature_set: PluginProvidedFeatureSet = dataclasses.field(repr=False)
maint_preference_table: "MaintainerPreferenceTable" = dataclasses.field(repr=False)
source_root: VirtualPathBase | None
debian_dir: VirtualPathBase | None
path: str
content: str
lines: list[str]
debputy_config: DebputyConfig
source_package: SourcePackage | None = None
binary_packages: Mapping[str, BinaryPackage] | None = None
effective_preference: Optional["EffectiveFormattingPreference"] = None
lint_implementation: Optional["AsyncLinterImpl"] = None
_parsed_cache: Deb822FileElement | None = None
_dh_sequencer_cache: DhSequencerData | None = None
_diagnostics: list[types.Diagnostic] | None = None
@property
def doc_uri(self) -> str:
path = self.path
abs_path = os.path.join(os.path.curdir, path)
return f"file://{abs_path}"
@property
def doc_version(self) -> int | None:
return None
@property
def position_codec(self) -> "LintCapablePositionCodec":
return LINTER_POSITION_CODEC
@property
def parsed_deb822_file_content(self) -> Deb822FileElement | None:
cache = self._parsed_cache
if cache is None:
cache = parse_deb822_file(
self.lines,
accept_files_with_error_tokens=True,
accept_files_with_duplicated_fields=True,
)
self._parsed_cache = cache
return cache
@property
def dh_sequencer_data(self) -> DhSequencerData:
dh_sequencer_cache = self._dh_sequencer_cache
if dh_sequencer_cache is None:
debian_dir = self.debian_dir
dh_sequences: set[str] = set()
saw_dh = False
src_pkg = self.source_package
drules = debian_dir.get("rules") if debian_dir is not None else None
if drules and drules.is_file:
try:
with drules.open() as fd:
saw_dh = parse_drules_for_addons(fd, dh_sequences)
except PureVirtualPathError:
pass
if src_pkg:
extract_dh_addons_from_control(src_pkg.fields, dh_sequences)
dh_sequencer_cache = DhSequencerData(
frozenset(dh_sequences),
saw_dh,
)
self._dh_sequencer_cache = dh_sequencer_cache
return dh_sequencer_cache
@property
def workspace_text_edit_support(self) -> WorkspaceTextEditSupport:
return CLI_WORKSPACE_TEXT_EDIT_SUPPORT
async def gather_diagnostics(self) -> list[types.Diagnostic]:
if self._diagnostics is not None:
raise RuntimeError(
"run_diagnostics cannot be run while it is already running"
)
linter = self.lint_implementation
if linter is None:
raise TypeError(
"run_diagnostics cannot be run:"
" LintState was created without a lint implementation (such as for reformat-only)"
)
self._diagnostics = diagnostics = []
await linter(self)
self._diagnostics = None
return diagnostics
def clear_cache(self) -> None:
self._parsed_cache = None
self._dh_sequencer_cache = None
def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None:
diagnostics = self._diagnostics
if diagnostics is None:
raise TypeError("Cannot run emit_diagnostic outside of gather_diagnostics")
diagnostics.append(diagnostic)
class LintDiagnosticResultState(IntEnum):
REPORTED = 1
MANUAL_FIXABLE = 2
AUTO_FIXABLE = 3
FIXED = 4
@dataclasses.dataclass(slots=True, frozen=True)
class LintDiagnosticResult:
diagnostic: types.Diagnostic
result_state: LintDiagnosticResultState
invalid_marker: RuntimeError | None
is_file_level_diagnostic: bool
has_broken_range: bool
missing_severity: bool
discovered_in: str
report_for_related_file: str | None
class LintReport:
def __init__(self) -> None:
self.diagnostics_count: Counter[types.DiagnosticSeverity] = Counter()
self.diagnostics_by_file: Mapping[str, list[LintDiagnosticResult]] = (
defaultdict(list)
)
self.number_of_invalid_diagnostics: int = 0
self.number_of_broken_diagnostics: int = 0
self.lint_state: LintState | None = None
self.start_timestamp = datetime.datetime.now()
self.durations: dict[str, float] = collections.defaultdict(float)
self._timer = time.perf_counter()
@contextlib.contextmanager
def line_state(self, lint_state: LintState) -> Iterable[None]:
previous = self.lint_state
if previous is not None:
path = previous.path
duration = time.perf_counter() - self._timer
self.durations[path] += duration
self.lint_state = lint_state
try:
self._timer = time.perf_counter()
yield
finally:
now = time.perf_counter()
duration = now - self._timer
self.durations[lint_state.path] += duration
self._timer = now
self.lint_state = previous
def report_diagnostic(
self,
diagnostic: types.Diagnostic,
*,
result_state: LintDiagnosticResultState = LintDiagnosticResultState.REPORTED,
in_file: str | None = None,
) -> None:
lint_state = self.lint_state
assert lint_state is not None
if in_file is None:
in_file = lint_state.path
discovered_in_file = in_file
severity = diagnostic.severity
missing_severity = False
error_marker: RuntimeError | None = None
if severity is None:
self.number_of_invalid_diagnostics += 1
severity = types.DiagnosticSeverity.Warning
diagnostic.severity = severity
missing_severity = True
lines = lint_state.lines
diag_range = diagnostic.range
start_pos = diag_range.start
end_pos = diag_range.end
diag_data = diagnostic.data
if isinstance(diag_data, dict):
report_for_related_file = diag_data.get("report_for_related_file")
if report_for_related_file is None or not isinstance(
report_for_related_file, str
):
report_for_related_file = None
else:
in_file = report_for_related_file
# Force it to exist in self.durations, since subclasses can use .items() or "foo" in self.durations.
if in_file not in self.durations:
self.durations[in_file] = 0
else:
report_for_related_file = None
if report_for_related_file is not None:
is_file_level_diagnostic = True
else:
is_file_level_diagnostic = _is_file_level_diagnostic(
lines,
start_pos.line,
start_pos.character,
end_pos.line,
end_pos.character,
)
has_broken_range = not is_file_level_diagnostic and (
end_pos.line > len(lines) or start_pos.line < 0
)
if has_broken_range or missing_severity:
error_marker = RuntimeError("Registration Marker for invalid diagnostic")
diagnostic_result = LintDiagnosticResult(
diagnostic,
result_state,
error_marker,
is_file_level_diagnostic,
has_broken_range,
missing_severity,
report_for_related_file=report_for_related_file,
discovered_in=discovered_in_file,
)
self.diagnostics_by_file[in_file].append(diagnostic_result)
self.diagnostics_count[severity] += 1
self.process_diagnostic(in_file, lint_state, diagnostic_result)
def process_diagnostic(
self,
filename: str,
lint_state: LintState,
diagnostic_result: LintDiagnosticResult,
) -> None:
# Subclass hook
pass
def finish_report(self) -> None:
# Subclass hook
pass
_LS2DEBPUTY_SEVERITY: Mapping[types.DiagnosticSeverity, LintSeverity] = {
types.DiagnosticSeverity.Error: "error",
types.DiagnosticSeverity.Warning: "warning",
types.DiagnosticSeverity.Information: "informational",
types.DiagnosticSeverity.Hint: "pedantic",
}
_TERM_SEVERITY2TAG = {
types.DiagnosticSeverity.Error: lambda fo, lint_tag=None: fo.colored(
lint_tag if lint_tag else "error",
fg="red",
bg="black",
style="bold",
),
types.DiagnosticSeverity.Warning: lambda fo, lint_tag=None: fo.colored(
lint_tag if lint_tag else "warning",
fg="yellow",
bg="black",
style="bold",
),
types.DiagnosticSeverity.Information: lambda fo, lint_tag=None: fo.colored(
lint_tag if lint_tag else "informational",
fg="blue",
bg="black",
style="bold",
),
types.DiagnosticSeverity.Hint: lambda fo, lint_tag=None: fo.colored(
lint_tag if lint_tag else "pedantic",
fg="green",
bg="black",
style="bold",
),
}
def debputy_severity(diagnostic: types.Diagnostic) -> LintSeverity:
lint_tag: LintSeverity | None = None
if isinstance(diagnostic.data, dict):
lint_tag = cast("LintSeverity", diagnostic.data.get("lint_severity"))
if lint_tag is not None:
return lint_tag
severity = diagnostic.severity
if severity is None:
return "warning"
return _LS2DEBPUTY_SEVERITY.get(severity, "warning")
class TermLintReport(LintReport):
def __init__(self, fo: IOBasedOutputStyling) -> None:
super().__init__()
self.fo = fo
def finish_report(self) -> None:
# Nothing to do for now
pass
def process_diagnostic(
self,
filename: str,
lint_state: LintState,
diagnostic_result: LintDiagnosticResult,
) -> None:
diagnostic = diagnostic_result.diagnostic
fo = self.fo
severity = diagnostic.severity
assert severity is not None
if diagnostic_result.result_state != LintDiagnosticResultState.FIXED:
tag_unresolved = _TERM_SEVERITY2TAG[severity]
lint_tag: LintSeverity | None = debputy_severity(diagnostic)
tag = tag_unresolved(fo, lint_tag)
else:
tag = fo.colored(
"auto-fixing",
fg="magenta",
bg="black",
style="bold",
)
if diagnostic_result.is_file_level_diagnostic:
start_line = 0
start_position = 0
end_line = 0
end_position = 0
else:
start_line = diagnostic.range.start.line
start_position = diagnostic.range.start.character
end_line = diagnostic.range.end.line
end_position = diagnostic.range.end.character
authority = diagnostic.source
assert authority is not None
diag_tags = f" [{authority}]"
lines = lint_state.lines
line_no_format_width = len(str(len(lines)))
if diagnostic_result.result_state == LintDiagnosticResultState.AUTO_FIXABLE:
diag_tags += "[Correctable via --auto-fix]"
elif diagnostic_result.result_state == LintDiagnosticResultState.MANUAL_FIXABLE:
diag_tags += "[LSP interactive quickfix]"
code = f"[{diagnostic.code}]: " if diagnostic.code else ""
msg = f"{code}{diagnostic.message}"
print(
f"{tag}: File: {filename}:{start_line+1}:{start_position}:{end_line+1}:{end_position}: {msg}{diag_tags}",
)
if diagnostic_result.missing_severity:
_warn(
" This warning did not have an explicit severity; Used Warning as a fallback!"
)
if diagnostic_result.result_state == LintDiagnosticResultState.FIXED:
# If it is fixed, there is no reason to show additional context.
return
if diagnostic_result.is_file_level_diagnostic:
print(" File-level diagnostic")
return
if diagnostic_result.has_broken_range:
_warn(
"Bug in the underlying linter: The line numbers of the warning does not fit in the file..."
)
return
self._print_range_context(diagnostic.range, lines, line_no_format_width)
related_info_list = diagnostic.related_information or []
hint_tag = fo.colored(
"Related information",
fg="magenta",
bg="black",
style="bold",
)
for related_info in related_info_list:
if related_info.location.uri != lint_state.doc_uri:
continue
print(f" {hint_tag}: {related_info.message}")
self._print_range_context(
related_info.location.range, lines, line_no_format_width
)
def _print_range_context(
self,
print_range: types.Range,
lines: list[str],
line_no_format_width: int,
) -> None:
lines_to_print = _lines_to_print(print_range)
fo = self.fo
start_line = print_range.start.line
for line_no in range(start_line, start_line + lines_to_print):
line = _highlight_range(fo, lines[line_no], line_no, print_range)
print(f" {line_no + 1:{line_no_format_width}}: {line}")
class LinterPositionCodec:
def client_num_units(self, chars: str):
return len(chars)
def position_from_client_units(
self,
lines: list[str],
position: types.Position,
) -> types.Position:
if len(lines) == 0:
return types.Position(0, 0)
if position.line >= len(lines):
return types.Position(len(lines) - 1, self.client_num_units(lines[-1]))
return position
def position_to_client_units(
self,
_lines: list[str],
position: types.Position,
) -> types.Position:
return position
def range_from_client_units(
self, _lines: list[str], range: types.Range
) -> types.Range:
return range
def range_to_client_units(
self, _lines: list[str], range: types.Range
) -> types.Range:
return range
LINTER_POSITION_CODEC = LinterPositionCodec()
def _lines_to_print(range_: types.Range) -> int:
count = range_.end.line - range_.start.line
if range_.end.character > 0:
count += 1
return count
def _highlight_range(
fo: IOBasedOutputStyling,
line: str,
line_no: int,
range_: types.Range,
) -> str:
line_wo_nl = line.rstrip("\r\n")
start_pos = 0
prefix = ""
suffix = ""
if line_no == range_.start.line:
start_pos = range_.start.character
prefix = line_wo_nl[0:start_pos]
if line_no == range_.end.line:
end_pos = range_.end.character
suffix = line_wo_nl[end_pos:]
else:
end_pos = len(line_wo_nl)
marked_part = fo.colored(line_wo_nl[start_pos:end_pos], fg="red", style="bold")
return prefix + marked_part + suffix
def _is_file_level_diagnostic(
lines: list[str],
start_line: int,
start_position: int,
end_line: int,
end_position: int,
) -> bool:
if start_line != 0 or start_position != 0:
return False
line_count = len(lines)
if end_line + 1 == line_count and end_position == 0:
return True
return end_line == line_count and line_count and end_position == len(lines[-1])
|