This commit is contained in:
Iliyan Angelov
2025-09-19 11:58:53 +03:00
parent 306b20e24a
commit 6b247e5b9f
11423 changed files with 1500615 additions and 778 deletions

View File

@@ -0,0 +1,32 @@
# The MIT License (MIT)
#
# Copyright (c) 2016 Adam Schubert
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Enum to define the casing types for the Cron Expression description
"""
import enum
@enum.unique
class CasingTypeEnum(enum.IntEnum):
Title = 1
Sentence = 2
LowerCase = 3

View File

@@ -0,0 +1,39 @@
# The MIT License (MIT)
#
# Copyright (c) 2016 Adam Schubert
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import enum
@enum.unique
class DescriptionTypeEnum(enum.IntEnum):
"""DescriptionTypeEnum
"""
FULL = 1
TIMEOFDAY = 2
SECONDS = 3
MINUTES = 4
HOURS = 5
DAYOFWEEK = 6
MONTH = 7
DAYOFMONTH = 8
YEAR = 9

View File

@@ -0,0 +1,59 @@
# The MIT License (MIT)
#
# Copyright (c) 2016 Adam Schubert
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
class MissingFieldException(Exception):
"""Deprecated, use MissingFieldError
"""
def __init__(self, message: str) -> None:
"""Initialize MissingFieldException
Args:
message: Message of exception
"""
super().__init__(f"Field '{message}' not found.")
class FormatException(Exception):
"""Deprecated use FormatError
"""
class WrongArgumentException(Exception):
"""Deprecated, use WrongArgumentError
"""
class MissingFieldError(MissingFieldException):
"""Exception for cases when something is missing
"""
class FormatError(FormatException):
"""Exception for cases when something has wrong format
"""
class WrongArgumentError(WrongArgumentException):
"""Exception for cases when wrong argument is passed
"""

View File

@@ -0,0 +1,664 @@
# The MIT License (MIT)
#
# Copyright (c) 2016 Adam Schubert
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from __future__ import annotations
import calendar
import datetime
import re
from typing import Callable, TypedDict
from typing_extensions import Unpack
from .CasingTypeEnum import CasingTypeEnum
from .DescriptionTypeEnum import DescriptionTypeEnum
from .Exception import FormatError, WrongArgumentError
from .ExpressionParser import ExpressionParser
from .GetText import GetText
from .Options import Options
from .StringBuilder import StringBuilder
class OptionsKwargs(TypedDict, total=False):
use_24hour_time_format: bool
locale_code: str
casing_type: CasingTypeEnum
verbose: bool
day_of_week_start_index_zero: bool
locale_location: str | None
class ExpressionDescriptor:
"""Converts a Cron Expression into a human readable string
"""
_special_characters = ("/", "-", ",", "*")
_expression = ""
_options: Options
_expression_parts: list[str]
def __init__(self, expression: str, options: Options | None=None, **kwargs: Unpack[OptionsKwargs]) -> None:
"""Initializes a new instance of the ExpressionDescriptor
Args:
expression: The cron expression string
options: Options to control the output description
Raises:
WrongArgumentException: if kwarg is unknown
"""
if options is None:
options = Options()
self._expression = expression
self._options = options
self._expression_parts = []
# if kwargs in _options, overwrite it, if not raise exception
for kwarg, value in kwargs.items():
if hasattr(self._options, kwarg):
setattr(self._options, kwarg, value)
else:
msg = f"Unknown {kwarg} configuration argument"
raise WrongArgumentError(msg)
# Initializes localization
self.get_text = GetText(options.locale_code, options.locale_location)
# Parse expression
parser = ExpressionParser(self._expression, self._options)
self._expression_parts = parser.parse()
def translate(self, message: str) -> str:
return self.get_text.trans.gettext(message)
def get_description(self, description_type: DescriptionTypeEnum = DescriptionTypeEnum.FULL) -> str:
"""Generates a humanreadable string for the Cron Expression
Args:
description_type: Which part(s) of the expression to describe
Returns:
The cron expression description
Raises:
Exception:
"""
choices = {
DescriptionTypeEnum.FULL: self.get_full_description,
DescriptionTypeEnum.TIMEOFDAY: self.get_time_of_day_description,
DescriptionTypeEnum.HOURS: self.get_hours_description,
DescriptionTypeEnum.MINUTES: self.get_minutes_description,
DescriptionTypeEnum.SECONDS: self.get_seconds_description,
DescriptionTypeEnum.DAYOFMONTH: self.get_day_of_month_description,
DescriptionTypeEnum.MONTH: self.get_month_description,
DescriptionTypeEnum.DAYOFWEEK: self.get_day_of_week_description,
DescriptionTypeEnum.YEAR: self.get_year_description,
}
return choices.get(description_type, self.get_seconds_description)()
def get_full_description(self) -> str:
"""Generates the FULL description
Returns:
The FULL description
Raises:
FormatException: if formatting fails
"""
try:
time_segment = self.get_time_of_day_description()
day_of_month_desc = self.get_day_of_month_description()
month_desc = self.get_month_description()
day_of_week_desc = self.get_day_of_week_description()
year_desc = self.get_year_description()
description = f"{time_segment}{day_of_month_desc}{day_of_week_desc}{month_desc}{year_desc}"
description = self.transform_verbosity(description, use_verbose_format=self._options.verbose)
description = ExpressionDescriptor.transform_case(description, self._options.casing_type)
except Exception as e:
description = self.translate(
"An error occurred when generating the expression description. Check the cron expression syntax.",
)
raise FormatError(description) from e
return description
def get_time_of_day_description(self) -> str:
"""Generates a description for only the TIMEOFDAY portion of the expression
Returns:
The TIMEOFDAY description
"""
seconds_expression = self._expression_parts[0]
minute_expression = self._expression_parts[1]
hour_expression = self._expression_parts[2]
description = StringBuilder()
# handle special cases first
if any(exp in minute_expression for exp in self._special_characters) is False and \
any(exp in hour_expression for exp in self._special_characters) is False and \
any(exp in seconds_expression for exp in self._special_characters) is False:
# specific time of day (i.e. 10 14)
description.append(self.translate("At "))
description.append(
self.format_time(
hour_expression,
minute_expression,
seconds_expression))
elif seconds_expression == "" and "-" in minute_expression and \
"," not in minute_expression and \
any(exp in hour_expression for exp in self._special_characters) is False:
# minute range in single hour (i.e. 0-10 11)
minute_parts = minute_expression.split("-")
description.append(self.translate("Every minute between {0} and {1}").format(
self.format_time(hour_expression, minute_parts[0]), self.format_time(hour_expression, minute_parts[1])))
elif seconds_expression == "" and "," in hour_expression and "-" not in hour_expression and \
any(exp in minute_expression for exp in self._special_characters) is False:
# hours list with single minute (o.e. 30 6,14,16)
hour_parts = hour_expression.split(",")
description.append(self.translate("At"))
for i, hour_part in enumerate(hour_parts):
description.append(" ")
description.append(self.format_time(hour_part, minute_expression))
if i < (len(hour_parts) - 2):
description.append(",")
if i == len(hour_parts) - 2:
description.append(self.translate(" and"))
else:
# default time description
seconds_description = self.get_seconds_description()
minutes_description = self.get_minutes_description()
hours_description = self.get_hours_description()
description.append(seconds_description)
if description and minutes_description:
description.append(", ")
description.append(minutes_description)
if description and hours_description:
description.append(", ")
description.append(hours_description)
return str(description)
def get_seconds_description(self) -> str:
"""Generates a description for only the SECONDS portion of the expression
Returns:
The SECONDS description
"""
def get_description_format(s: str) -> str:
if s == "0":
return ""
try:
if int(s) < 20:
return self.translate("at {0} seconds past the minute")
return self.translate("at {0} seconds past the minute [grThen20]") or self.translate("at {0} seconds past the minute")
except ValueError:
return self.translate("at {0} seconds past the minute")
return self.get_segment_description(
self._expression_parts[0],
self.translate("every second"),
lambda s: s,
lambda s: self.translate("every {0} seconds").format(s),
lambda _: self.translate("seconds {0} through {1} past the minute"),
get_description_format,
lambda _: self.translate(", second {0} through second {1}") or self.translate(", {0} through {1}"),
)
def get_minutes_description(self) -> str:
"""Generates a description for only the MINUTE portion of the expression
Returns:
The MINUTE description
"""
seconds_expression = self._expression_parts[0]
def get_description_format(s: str) -> str:
if s == "0" and seconds_expression == "":
return ""
try:
if int(s) < 20:
return self.translate("at {0} minutes past the hour")
return self.translate("at {0} minutes past the hour [grThen20]") or self.translate("at {0} minutes past the hour")
except ValueError:
return self.translate("at {0} minutes past the hour")
return self.get_segment_description(
self._expression_parts[1],
self.translate("every minute"),
lambda s: s,
lambda s: self.translate("every {0} minutes").format(s),
lambda _: self.translate("minutes {0} through {1} past the hour"),
get_description_format,
lambda _: self.translate(", minute {0} through minute {1}") or self.translate(", {0} through {1}"),
)
def get_hours_description(self) -> str:
"""Generates a description for only the HOUR portion of the expression
Returns:
The HOUR description
"""
expression = self._expression_parts[2]
return self.get_segment_description(
expression,
self.translate("every hour"),
lambda s: self.format_time(s, "0"),
lambda s: self.translate("every {0} hours").format(s),
lambda _: self.translate("between {0} and {1}"),
lambda _: self.translate("at {0}"),
lambda _: self.translate(", hour {0} through hour {1}") or self.translate(", {0} through {1}"),
)
def get_day_of_week_description(self) -> str:
"""Generates a description for only the DAYOFWEEK portion of the expression
Returns:
The DAYOFWEEK description
"""
if self._expression_parts[5] == "*":
# DOW is specified as * so we will not generate a description and defer to DOM part.
# Otherwise, we could get a contradiction like "on day 1 of the month, every day"
# or a dupe description like "every day, every day".
return ""
def get_day_name(s: str) -> str:
exp = s
if "#" in s:
exp, _ = s.split("#", 2)
elif "L" in s:
exp = exp.replace("L", "")
return ExpressionDescriptor.number_to_day(int(exp))
def get_format(s: str) -> str:
if "#" in s:
day_of_week_of_month = s[s.find("#") + 1:]
try:
day_of_week_of_month_number = int(day_of_week_of_month)
choices = {
1: self.translate("first"),
2: self.translate("second"),
3: self.translate("third"),
4: self.translate("fourth"),
5: self.translate("fifth"),
}
day_of_week_of_month_description = choices.get(day_of_week_of_month_number, "")
except ValueError:
day_of_week_of_month_description = ""
formatted = "{}{}{}".format(self.translate(", on the "), day_of_week_of_month_description, self.translate(" {0} of the month"))
elif "L" in s:
formatted = self.translate(", on the last {0} of the month")
else:
formatted = self.translate(", only on {0}")
return formatted
return self.get_segment_description(
self._expression_parts[5],
self.translate(", every day"),
lambda s: get_day_name(s),
lambda s: self.translate(", every {0} days of the week").format(s),
lambda _: self.translate(", {0} through {1}"),
lambda s: get_format(s),
lambda _: self.translate(", {0} through {1}"),
)
def get_month_description(self) -> str:
"""Generates a description for only the MONTH portion of the expression
Returns:
The MONTH description
"""
return self.get_segment_description(
self._expression_parts[4],
"",
lambda s: datetime.date(datetime.datetime.now(tz=datetime.timezone.utc).date().year, int(s), 1).strftime("%B"),
lambda s: self.translate(", every {0} months").format(s),
lambda _: self.translate(", month {0} through month {1}") or self.translate(", {0} through {1}"),
lambda _: self.translate(", only in {0}"),
lambda _: self.translate(", month {0} through month {1}") or self.translate(", {0} through {1}"),
)
def get_day_of_month_description(self) -> str:
"""Generates a description for only the DAYOFMONTH portion of the expression
Returns:
The DAYOFMONTH description
"""
expression = self._expression_parts[3]
if expression == "L":
description = self.translate(", on the last day of the month")
elif expression in ("LW", "WL"):
description = self.translate(", on the last weekday of the month")
else:
regex = re.compile(r"(\d{1,2}W)|(W\d{1,2})")
m = regex.match(expression)
if m: # if matches
day_number = int(m.group().replace("W", ""))
day_string = self.translate("first weekday") if day_number == 1 else self.translate("weekday nearest day {0}").format(day_number)
description = self.translate(", on the {0} of the month").format(day_string)
elif expression == "*" and self._expression_parts[5] != "*":
# DOW is specified, but DOM is *, so do not generate DOM description.
# Otherwise, we could get a contradiction like "every day, on Tuesday"
description = ""
else:
# Handle "last day offset"(i.e.L - 5: "5 days before the last day of the month")
regex = re.compile(r"L-(\d{1,2})")
m = regex.match(expression)
if m: # if matches
off_set_days = m.group(1)
description = self.translate(", {0} days before the last day of the month").format(off_set_days)
else:
description = self.get_segment_description(
expression,
self.translate(", every day"),
lambda s: s,
lambda s: self.translate(", every day") if s == "1" else self.translate(", every {0} days"),
lambda _: self.translate(", between day {0} and {1} of the month"),
lambda _: self.translate(", on day {0} of the month"),
lambda _: self.translate(", {0} through {1}"),
)
return description
def get_year_description(self) -> str:
"""Generates a description for only the YEAR portion of the expression
Returns:
The YEAR description
"""
def format_year(s: str) -> str:
regex = re.compile(r"^\d+$")
if regex.match(s):
year_int = int(s)
if year_int < 1900:
return str(year_int)
return datetime.date(year_int, 1, 1).strftime("%Y")
return s
return self.get_segment_description(
self._expression_parts[6],
"",
lambda s: format_year(s),
lambda s: self.translate(", every {0} years").format(s),
lambda _: self.translate(", year {0} through year {1}") or self.translate(", {0} through {1}"),
lambda _: self.translate(", only in {0}"),
lambda _: self.translate(", year {0} through year {1}") or self.translate(", {0} through {1}"),
)
def get_segment_description(
self,
expression: str,
all_description: str,
get_single_item_description: Callable[[str], str],
get_interval_description_format: Callable[[str], str],
get_between_description_format: Callable[[str], str],
get_description_format: Callable[[str], str],
get_range_format: Callable[[str], str],
) -> str:
"""Returns segment description
Args:
expression: Segment to descript
all_description: *
get_single_item_description: 1
get_interval_description_format: 1/2
get_between_description_format: 1-2
get_description_format: format get_single_item_description
get_range_format: function that formats range expressions depending on cron parts
Returns:
segment description
"""
if not expression:
return ""
if expression == "*":
return all_description
if not any(ext in expression for ext in ["/", "-", ","]):
return get_description_format(expression).format(get_single_item_description(expression))
if "/" in expression:
segments = expression.split("/")
description = get_interval_description_format(segments[1]).format(segments[1])
# interval contains 'between' piece (i.e. 2-59/3 )
if "-" in segments[0]:
between_segment_description = self.generate_between_segment_description(
segments[0],
get_between_description_format,
get_single_item_description,
)
if not between_segment_description.startswith(", "):
description += ", "
description += between_segment_description
elif not any(ext in segments[0] for ext in ["*", ","]):
range_item_description = get_description_format(segments[0]).format(
get_single_item_description(segments[0]),
)
range_item_description = range_item_description.replace(", ", "")
description += self.translate(", starting {0}").format(range_item_description)
return description
if "," in expression:
segments = expression.split(",")
description_content = ""
for i, segment in enumerate(segments):
if i > 0 and len(segments) > 2:
description_content += ","
if i < len(segments) - 1:
description_content += " "
if i > 0 and len(segments) > 1 and (i == len(segments) - 1 or len(segments) == 2):
description_content += self.translate(" and ")
if "-" in segment:
between_segment_description = self.generate_between_segment_description(
segment,
get_range_format,
get_single_item_description,
)
between_segment_description = between_segment_description.replace(", ", "")
description_content += between_segment_description
else:
description_content += get_single_item_description(segment)
return get_description_format(expression).format(description_content)
if "-" in expression:
return self.generate_between_segment_description(
expression,
get_between_description_format,
get_single_item_description,
)
return "?"
def generate_between_segment_description(
self,
between_expression: str,
get_between_description_format: Callable[[str], str],
get_single_item_description: Callable[[str], str],
) -> str:
"""Generates the between segment description
:param between_expression:
:param get_between_description_format:
:param get_single_item_description:
:return: The between segment description
"""
description = ""
between_segments = between_expression.split("-")
between_segment_1_description = get_single_item_description(between_segments[0])
between_segment_2_description = get_single_item_description(between_segments[1])
between_segment_2_description = between_segment_2_description.replace(":00", ":59")
between_description_format = get_between_description_format(between_expression)
description += between_description_format.format(between_segment_1_description, between_segment_2_description)
return description
def format_time(
self,
hour_expression: str,
minute_expression: str,
second_expression: str | None=None,
) -> str:
"""Given time parts, will construct a formatted time description
Args:
hour_expression: Hours part
minute_expression: Minutes part
second_expression: Seconds part
Returns:
Formatted time description
"""
hour = int(hour_expression)
period = ""
if self._options.use_24hour_time_format is False:
period = self.translate("PM") if (hour >= 12) else self.translate("AM")
if period:
# add preceding space
period = " " + period
if hour > 12:
hour -= 12
if hour == 0:
hour = 12
minute = str(int(minute_expression)) # Removes leading zero if any
second = ""
if second_expression is not None and second_expression:
second = "{}{}".format(":", str(int(second_expression)).zfill(2))
return f"{str(hour).zfill(2)}:{minute.zfill(2)}{second}{period}"
def transform_verbosity(self, description: str, *, use_verbose_format: bool = False) -> str:
"""Transforms the verbosity of the expression description by stripping verbosity from original description
Args:
description: The description to transform
use_verbose_format: If True, will leave description as it, if False, will strip verbose parts
Returns:
The transformed description with proper verbosity
"""
if not use_verbose_format:
description = description.replace(self.translate(", every minute"), "")
description = description.replace(self.translate(", every hour"), "")
description = description.replace(self.translate(", every day"), "")
description = re.sub(r", ?$", "", description)
return description
@staticmethod
def transform_case(description: str, case_type: CasingTypeEnum) -> str:
"""Transforms the case of the expression description, based on options
Args:
description: The description to transform
case_type: The casing type that controls the output casing
Returns:
The transformed description with proper casing
"""
if case_type == CasingTypeEnum.Sentence:
description = f"{description[0].upper()}{description[1:]}"
elif case_type == CasingTypeEnum.Title:
description = description.title()
else:
description = description.lower()
return description
@staticmethod
def number_to_day(day_number: int) -> str:
"""Returns localized day name by its CRON number
Args:
day_number: Number of a day
Returns:
Day corresponding to day_number
Raises:
IndexError: When day_number is not found
"""
try:
return [
calendar.day_name[6],
calendar.day_name[0],
calendar.day_name[1],
calendar.day_name[2],
calendar.day_name[3],
calendar.day_name[4],
calendar.day_name[5],
][day_number]
except IndexError as e:
msg = f"Day {day_number} is out of range!"
raise IndexError(msg) from e
def __str__(self) -> str:
return self.get_description()
def __repr__(self) -> str:
return self.get_description()
def get_description(expression: str, options: Options | None=None) -> str:
"""Generates a human readable string for the Cron Expression
Args:
expression: The cron expression string
options: Options to control the output description
Returns:
The cron expression description
"""
descriptor = ExpressionDescriptor(expression, options)
return descriptor.get_description(DescriptionTypeEnum.FULL)

View File

@@ -0,0 +1,226 @@
# The MIT License (MIT)
#
# Copyright (c) 2016 Adam Schubert
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import re
from typing import ClassVar
from .Exception import FormatError, MissingFieldError
from .Options import Options
class ExpressionParser:
_expression = ""
_options: Options
_cron_days: ClassVar[dict[int, str]] = {
0: "SUN",
1: "MON",
2: "TUE",
3: "WED",
4: "THU",
5: "FRI",
6: "SAT",
}
_cron_months: ClassVar[dict[int, str]] = {
1: "JAN",
2: "FEB",
3: "MAR",
4: "APR",
5: "MAY",
6: "JUN",
7: "JUL",
8: "AUG",
9: "SEP",
10: "OCT",
11: "NOV",
12: "DEC",
}
def __init__(self, expression: str, options: Options) -> None:
"""Initializes a new instance of the ExpressionParser class
Args:
expression: The cron expression string
options: Parsing options
"""
self._expression = expression
self._options = options
def parse(self) -> list[str]:
"""Parses the cron expression string
Returns:
A 7 part string array, one part for each component of the cron expression (seconds, minutes, etc.)
Raises:
MissingFieldException: if _expression is empty or None
FormatException: if _expression has wrong format
"""
# Initialize all elements of parsed array to empty strings
parsed = ["", "", "", "", "", "", ""]
if not self._expression:
msg = "ExpressionDescriptor.expression"
raise MissingFieldError(msg)
expression_parts_temp = self._expression.split()
expression_parts_temp_length = len(expression_parts_temp)
if expression_parts_temp_length < 5:
msg = f"Error: Expression only has {expression_parts_temp_length} parts. At least 5 part are required."
raise FormatError(msg)
if expression_parts_temp_length == 5:
# 5 part cron so shift array past seconds element
for i, expression_part_temp in enumerate(expression_parts_temp):
parsed[i + 1] = expression_part_temp
elif expression_parts_temp_length == 6:
# We will detect if this 6 part expression has a year specified and if so we will shift the parts and treat the
# first part as a minute part rather than a second part.
# Ways we detect:
# 1. Last part is a literal year (i.e. 2020)
# 2. 3rd or 5th part is specified as "?" (DOM or DOW)
year_regex = re.compile(r"\d{4}$")
is_year_with_no_seconds_part = bool(year_regex.search(expression_parts_temp[5])) or "?" in [expression_parts_temp[4], expression_parts_temp[2]]
for i, expression_part_temp in enumerate(expression_parts_temp):
if is_year_with_no_seconds_part:
# Shift parts over by one
parsed[i + 1] = expression_part_temp
else:
parsed[i] = expression_part_temp
elif expression_parts_temp_length == 7:
parsed = expression_parts_temp
else:
msg = f"Error: Expression has too many parts ({expression_parts_temp_length}). Expression must not have more than 7 parts."
raise FormatError(msg)
self.normalize_expression(parsed)
return parsed
def normalize_expression(self, expression_parts: list[str]) -> None:
"""Converts cron expression components into consistent, predictable formats.
Args:
expression_parts: A 7 part string array, one part for each component of the cron expression
Returns:
None
"""
# convert ? to * only for DOM and DOW
expression_parts[3] = expression_parts[3].replace("?", "*")
expression_parts[5] = expression_parts[5].replace("?", "*")
# convert 0/, 1/ to */
if expression_parts[0].startswith("0/"):
expression_parts[0] = expression_parts[0].replace("0/", "*/") # seconds
if expression_parts[1].startswith("0/"):
expression_parts[1] = expression_parts[1].replace("0/", "*/") # minutes
if expression_parts[2].startswith("0/"):
expression_parts[2] = expression_parts[2].replace("0/", "*/") # hours
if expression_parts[3].startswith("1/"):
expression_parts[3] = expression_parts[3].replace("1/", "*/") # DOM
if expression_parts[4].startswith("1/"):
expression_parts[4] = expression_parts[4].replace("1/", "*/") # Month
if expression_parts[5].startswith("1/"):
expression_parts[5] = expression_parts[5].replace("1/", "*/") # DOW
if expression_parts[6].startswith("1/"):
expression_parts[6] = expression_parts[6].replace("1/", "*/") # Years
# Adjust DOW based on dayOfWeekStartIndexZero option
def digit_replace(match: re.Match[str]) -> str:
match_value = match.group()
dow_digits = re.sub(r"\D", "", match_value)
dow_digits_adjusted = dow_digits
if self._options.day_of_week_start_index_zero:
if dow_digits == "7":
dow_digits_adjusted = "0"
else:
dow_digits_adjusted = str(int(dow_digits) - 1)
return match_value.replace(dow_digits, dow_digits_adjusted)
expression_parts[5] = re.sub(r"(^\d)|([^#/\s]\d)", digit_replace, expression_parts[5])
# Convert DOM '?' to '*'
if expression_parts[3] == "?":
expression_parts[3] = "*"
# convert SUN-SAT format to 0-6 format
for day_number in self._cron_days:
expression_parts[5] = expression_parts[5].upper().replace(self._cron_days[day_number], str(day_number))
# convert JAN-DEC format to 1-12 format
for month_number in self._cron_months:
expression_parts[4] = expression_parts[4].upper().replace(
self._cron_months[month_number], str(month_number))
# convert 0 second to (empty)
if expression_parts[0] == "0":
expression_parts[0] = ""
# If time interval is specified for seconds or minutes and next time part is single item, make it a "self-range" so
# the expression can be interpreted as an interval 'between' range.
# For example:
# 0-20/3 9 * * * => 0-20/3 9-9 * * * (9 => 9-9)
# */5 3 * * * => */5 3-3 * * * (3 => 3-3)
star_and_slash = ["*", "/"]
has_part_zero_star_and_slash = any(ext in expression_parts[0] for ext in star_and_slash)
has_part_one_star_and_slash = any(ext in expression_parts[1] for ext in star_and_slash)
has_part_two_special_chars = any(ext in expression_parts[2] for ext in ["*", "-", ",", "/"])
if not has_part_two_special_chars and (has_part_zero_star_and_slash or has_part_one_star_and_slash):
expression_parts[2] += f"-{expression_parts[2]}"
# Loop through all parts and apply global normalization
length = len(expression_parts)
for i in range(length):
# convert all '*/1' to '*'
if expression_parts[i] == "*/1":
expression_parts[i] = "*"
"""
Convert Month,DOW,Year step values with a starting value (i.e. not '*') to between expressions.
This allows us to reuse the between expression handling for step values.
For Example:
- month part '3/2' will be converted to '3-12/2' (every 2 months between March and December)
- DOW part '3/2' will be converted to '3-6/2' (every 2 days between Tuesday and Saturday)
"""
if "/" in expression_parts[i] and not any(exp in expression_parts[i] for exp in ["*", "-", ","]):
choices = {
4: "12",
5: "6",
6: "9999",
}
step_range_through = choices.get(i)
if step_range_through is not None:
parts = expression_parts[i].split("/")
expression_parts[i] = f"{parts[0]}-{step_range_through}/{parts[1]}"

View File

@@ -0,0 +1,628 @@
from __future__ import annotations
import re
from typing import ClassVar
from cron_descriptor import FormatError
class ExpressionValidator:
_cron_days: ClassVar[dict[int, str]] = {
0: "SUN",
1: "MON",
2: "TUE",
3: "WED",
4: "THU",
5: "FRI",
6: "SAT",
}
_cron_months: ClassVar[dict[int, str]] = {
1: "JAN",
2: "FEB",
3: "MAR",
4: "APR",
5: "MAY",
6: "JUN",
7: "JUL",
8: "AUG",
9: "SEP",
10: "OCT",
11: "NOV",
12: "DEC",
}
def validate(self, expression: str) -> None:
"""Parses the cron expression string
Returns:
A 7 part string array, one part for each component of the cron expression (seconds, minutes, etc.)
Raises:
MissingFieldException: if _expression is empty or None
FormatException: if _expression has wrong format
"""
# Initialize all elements of parsed array to empty strings
parsed = ["", "", "", "", "", "", ""]
expression_parts_temp = expression.split()
expression_parts_temp_length = len(expression_parts_temp)
if expression_parts_temp_length < 5:
msg = f"Error: Expression only has {expression_parts_temp_length} parts. At least 5 part are required."
raise FormatError(msg)
if expression_parts_temp_length == 5:
# 5 part cron so shift array past seconds element
for i, expression_part_temp in enumerate(expression_parts_temp):
parsed[i + 1] = expression_part_temp
elif expression_parts_temp_length == 6:
# We will detect if this 6 part expression has a year specified and if so we will shift the parts and treat the
# first part as a minute part rather than a second part.
# Ways we detect:
# 1. Last part is a literal year (i.e. 2020)
# 2. 3rd or 5th part is specified as "?" (DOM or DOW)
year_regex = re.compile(r"\d{4}$")
is_year_with_no_seconds_part = bool(year_regex.search(expression_parts_temp[5])) or "?" in [
expression_parts_temp[4], expression_parts_temp[2]]
for i, expression_part_temp in enumerate(expression_parts_temp):
if is_year_with_no_seconds_part:
# Shift parts over by one
parsed[i + 1] = expression_part_temp
else:
parsed[i] = expression_part_temp
elif expression_parts_temp_length == 7:
parsed = expression_parts_temp
else:
msg = f"Error: Expression has too many parts ({expression_parts_temp_length}). Expression must not have more than 7 parts."
raise FormatError(msg)
self._validate_expression(parsed, expression_parts_temp_length)
def _validate_expression(self, expression_parts: list[str], expr_length: int) -> None:
"""Validation for each expression fields
Args:
expression_parts: expression list
expr_length: length of the list
"""
"""
Apply different index for varying length of the expression parts as it is mutated by parse().
Does not validate the case for having both DOW,DOM value because it is already causing exception.
"""
if expr_length == 5:
self.second_minute(expression_parts[1], "Second and Minute")
self.hour(expression_parts[2], "Hour")
self.dayofmonth(expression_parts[3], "DayOfMonth")
self.month(expression_parts[4], "Month")
self.dayofweek(expression_parts[5], "DayOfWeek")
elif expr_length == 6:
year_regex = re.compile(r"\d{4}$")
if year_regex.search(expression_parts[6]) is None:
if expression_parts[0]:
self.second_minute(expression_parts[0], "Second and Minute")
self.second_minute(expression_parts[1], "Second and Minute")
self.hour(expression_parts[2], "Hour")
self.dayofmonth(expression_parts[3], "DayOfMonth")
self.month(expression_parts[4], "Month")
self.dayofweek(expression_parts[5], "DayOfWeek")
else:
self.second_minute(expression_parts[1], "Second and Minute")
self.hour(expression_parts[2], "Hour")
self.dayofmonth(expression_parts[3], "DayOfMonth")
self.month(expression_parts[4], "Month")
self.dayofweek(expression_parts[5], "DayOfWeek")
self.year(expression_parts[6], "Year")
else:
if expression_parts[0]:
self.second_minute(expression_parts[0], "Second and Minute")
self.second_minute(expression_parts[1], "Second and Minute")
self.hour(expression_parts[2], "Hour")
self.dayofmonth(expression_parts[3], "DayOfMonth")
self.month(expression_parts[4], "Month")
self.dayofweek(expression_parts[5], "DayOfWeek")
if expression_parts[6]:
self.year(expression_parts[6], "Year")
def second_minute(self, expr: str, prefix: str) -> None:
""" sec/min expressions (n : Number, s: String)
*
nn (1~59)
nn-nn
nn/nn
nn-nn/nn
*/nn
nn,nn,nn (Maximum 24 elements)
"""
mi, mx = (0, 59)
if re.match(r"\d{1,2}$", expr):
self.check_range(expr=expr, mi=mi, mx=mx, prefix=prefix)
elif re.search(r"[-*,/]", expr):
if expr == "*":
pass
elif re.match(r"\d{1,2}-\d{1,2}$", expr):
parts = expr.split("-")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=parts[0], ed=parts[1], mi=mi, mx=mx, prefix=prefix)
elif re.match(r"\d{1,2}/\d{1,2}$", expr):
parts = expr.split("/")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(type_="interval", expr=parts[1], mi=mi, mx=mx, prefix=prefix)
elif re.match(r"\d{1,2}-\d{1,2}/\d{1,2}$", expr):
parts = expr.split("/")
fst_parts = parts[0].split("-")
self.check_range(expr=fst_parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=fst_parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=fst_parts[0], ed=fst_parts[1], mi=mi, mx=mx, prefix=prefix)
self.check_range(type_="interval", expr=parts[1], mi=mi, mx=mx, prefix=prefix)
elif re.match(r"\*/\d{1,2}$", expr):
parts = expr.split("/")
self.check_range(type_="interval", expr=parts[1], mi=mi, mx=mx, prefix=prefix)
elif re.match(r"^(\d{1,2}|\d{1,2}-\d{1,2})(,\d{1,2}|,\d{1,2}-\d{1,2})+$", expr):
limit = 60
expr_ls = expr.split(",")
if len(expr_ls) > limit:
msg = f"({prefix}) Exceeded maximum number({limit}) of specified value. '{len(expr_ls)}' is provided"
raise FormatError(msg)
for n in expr_ls:
if "-" in n:
parts = n.split("-")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=parts[0], ed=parts[1], mi=mi, mx=mx, prefix=prefix)
else:
self.check_range(expr=n, mi=mi, mx=mx, prefix=prefix)
else:
msg = f"({prefix}) Illegal Expression Format '{expr}'"
raise FormatError(msg)
else:
msg = f"({prefix}) Illegal Expression Format '{expr}'"
raise FormatError(msg)
def hour(self, expr: str, prefix: str) -> None:
""" hour expressions (n : Number, s: String)
*
nn (1~23)
nn-nn
nn/nn
nn-nn/nn
*/nn
nn,nn,nn (Maximum 24 elements)
"""
mi, mx = (0, 23)
if re.match(r"\d{1,2}$", expr):
self.check_range(expr=expr, mi=mi, mx=mx, prefix=prefix)
elif re.search(r"[-*,/]", expr):
if expr == "*":
pass
elif re.match(r"\d{1,2}-\d{1,2}$", expr):
parts = expr.split("-")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=parts[0], ed=parts[1], mi=mi, mx=mx, prefix=prefix)
elif re.match(r"\d{1,2}/\d{1,2}$", expr):
parts = expr.split("/")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(type_="interval", expr=parts[1], mi=mi, mx=mx, prefix=prefix)
elif re.match(r"\d{1,2}-\d{1,2}/\d{1,2}$", expr):
parts = expr.split("/")
fst_parts = parts[0].split("-")
self.check_range(expr=fst_parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=fst_parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=fst_parts[0], ed=fst_parts[1], mi=mi, mx=mx, prefix=prefix)
self.check_range(type_="interval", expr=parts[1], mi=mi, mx=mx, prefix=prefix)
elif re.match(r"\*/\d{1,2}$", expr):
parts = expr.split("/")
self.check_range(type_="interval", expr=parts[1], mi=mi, mx=mx, prefix=prefix)
elif re.match(r"^(\d{1,2}|\d{1,2}-\d{1,2})(,\d{1,2}|,\d{1,2}-\d{1,2})+$", expr):
limit = 24
expr_ls = expr.split(",")
if len(expr_ls) > limit:
msg = f"({prefix}) Exceeded maximum number({limit}) of specified value. '{len(expr_ls)}' is provided"
raise FormatError(msg)
for n in expr_ls:
if "-" in n:
parts = n.split("-")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=parts[0], ed=parts[1], mi=mi, mx=mx, prefix=prefix)
else:
self.check_range(expr=n, mi=mi, mx=mx, prefix=prefix)
else:
msg = f"({prefix}) Illegal Expression Format '{expr}'"
raise FormatError(msg)
else:
msg = f"({prefix}) Illegal Expression Format '{expr}'"
raise FormatError(msg)
def dayofmonth(self, expr: str, prefix: str) -> None:
""" DAYOfMonth expressions (n : Number, s: String)
*
?
nn (1~31)
nn-nn
nn/nn
nn-nn/nn
*/nn
nn,nn,nn (Maximum 31 elements)
L-nn
LW
nW
"""
mi, mx = (1, 31)
if re.match(r"\d{1,2}$", expr):
self.check_range(expr=expr, mi=mi, mx=mx, prefix=prefix)
elif re.search(r"[-*,/?]", expr):
if expr in ("*", "?"):
pass
elif re.match(r"\d{1,2}-\d{1,2}$", expr):
parts = expr.split("-")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=parts[0], ed=parts[1], mi=mi, mx=mx, prefix=prefix)
elif re.match(r"\d{1,2}/\d{1,2}$", expr):
parts = expr.split("/")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(type_="interval", expr=parts[1], mi=0, mx=mx, prefix=prefix)
elif re.match(r"\d{1,2}-\d{1,2}/\d{1,2}$", expr):
parts = expr.split("/")
fst_parts = parts[0].split("-")
self.check_range(expr=fst_parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=fst_parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=fst_parts[0], ed=fst_parts[1], mi=mi, mx=mx, prefix=prefix)
self.check_range(type_="interval", expr=parts[1], mi=0, mx=mx, prefix=prefix)
elif re.match(r"\*/\d{1,2}$", expr):
parts = expr.split("/")
self.check_range(type_="interval", expr=parts[1], mi=0, mx=mx, prefix=prefix)
elif re.match(r"^\d{1,2}(,\d{1,2})+$", expr):
limit = 31
expr_ls = expr.split(",")
if len(expr_ls) > 31:
msg = f"({prefix}) Exceeded maximum number({limit}) of specified value. '{len(expr_ls)}' is provided"
raise FormatError(msg)
for dayofmonth in expr_ls:
self.check_range(expr=dayofmonth, mi=mi, mx=mx, prefix=prefix)
elif re.match(r"^([Ll])-(\d{1,2})$", expr):
parts = expr.split("-")
self.check_range(expr=parts[1], mi=mi, mx=mx, prefix=prefix)
else:
msg = f"Illegal Expression Format '{expr}'"
raise FormatError(msg)
elif re.match(r"^([Ll])([Ww])?$", expr) or re.match(r"^([Ww])([Ll])?$", expr):
pass
elif re.match(r"^(\d{1,2})([wW])$", expr):
self.check_range(expr=expr[:-1], mi=mi, mx=mx, prefix=prefix)
elif re.match(r"^([wW])(\d{1,2})$", expr):
self.check_range(expr=expr[1:], mi=mi, mx=mx, prefix=prefix)
else:
msg = f"({prefix}) Illegal Expression Format '{expr}'"
raise FormatError(msg)
def month(self, expr: str, prefix: str) -> None:
""" month expressions (n : Number, s: String)
*
nn (1~12)
sss (JAN~DEC)
nn-nn
sss-sss
nn/nn
nn-nn/nn
*/nn
nn,nn,nn,nn-nn,sss-sss (Maximum 12 elements)
"""
mi, mx = (1, 12)
if re.match(r"\d{1,2}$", expr):
self.check_range(expr=expr, mi=mi, mx=mx, prefix=prefix)
elif re.match(r"\D{3}$", expr):
matched_month = [m for m in self._cron_months.values() if expr == m]
if len(matched_month) == 0:
msg = f"Invalid Month value '{expr}'"
raise FormatError(msg)
elif re.search(r"[-*,/]", expr):
if expr == "*":
pass
elif re.match(r"\d{1,2}-\d{1,2}$", expr):
parts = expr.split("-")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=parts[0], ed=parts[1], mi=mi, mx=mx, prefix=prefix)
elif re.match(r"\D{3}-\D{3}$", expr):
parts = expr.split("-")
cron_months = {v: k for (k, v) in self._cron_months.items()}
st_not_exist = parts[0] not in cron_months
ed_not_exist = parts[1] not in cron_months
if st_not_exist or ed_not_exist:
msg = f"Invalid Month value '{expr}'"
raise FormatError(msg)
self.compare_range(st=cron_months[parts[0]], ed=cron_months[parts[1]], mi=mi, mx=mx, prefix=prefix)
elif re.match(r"\d{1,2}/\d{1,2}$", expr):
parts = expr.split("/")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(type_="interval", expr=parts[1], mi=0, mx=mx, prefix=prefix)
elif re.match(r"\d{1,2}-\d{1,2}/\d{1,2}$", expr):
parts = expr.split("/")
fst_parts = parts[0].split("-")
self.check_range(expr=fst_parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=fst_parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=fst_parts[0], ed=fst_parts[1], mi=mi, mx=mx, prefix=prefix)
self.check_range(type_="interval", expr=parts[1], mi=0, mx=12, prefix=prefix)
elif re.match(r"\*/\d{1,2}$", expr):
parts = expr.split("/")
self.check_range(type_="interval", expr=parts[1], mi=0, mx=12, prefix=prefix)
elif re.match(r"^\d{1,2}(,\d{1,2})+$", expr):
limit = 12
expr_ls = expr.split(",")
if len(expr_ls) > limit:
msg = f"({prefix}) Exceeded maximum number({limit}) of specified value. '{len(expr_ls)}' is provided"
raise FormatError(msg)
for month in expr_ls:
self.check_range(expr=month, mi=mi, mx=mx, prefix=prefix)
elif re.match(r"^((\d{1,2}|\D{3})|(\D{3}-\D{3})|(\d{1,2}-\d{1,2}))((,\d{1,2})+"
r"|(,\D{3})*|(,\d{1,2}-\d{1,2})*|(,\D{3}-\D{3})*)*$", expr):
"""
1st Capture group : digit{1~2}|nondigit{3}|nondigit{3}-nondigit{3}|digit{3}-digit{3}
2nd Capture group : same with 1st capture group but repeated.
"""
limit = 12
expr_ls = expr.split(",")
if len(expr_ls) > limit:
msg = f"({prefix}) Exceeded maximum number({limit}) of specified value. '{len(expr_ls)}' is provided"
raise FormatError(msg)
cron_months = {v: k for (k, v) in self._cron_months.items()}
for month in expr_ls:
if "-" in month:
parts = month.split("-")
if len(parts[0]) == 3:
self.check_range(expr=cron_months[parts[0].upper()], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=cron_months[parts[1].upper()], mi=mi, mx=mx, prefix=prefix)
else:
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=parts[0], ed=parts[1], mi=mi, mx=mx, prefix=prefix)
else:
cron_month = cron_months[month.upper()] if len(month) == 3 else month
self.check_range(expr=cron_month, mi=mi, mx=mx, prefix=prefix)
else:
msg = f"({prefix}) Illegal Expression Format '{expr}'"
raise FormatError(msg)
else:
msg = f"({prefix}) Illegal Expression Format '{expr}'"
raise FormatError(msg)
def dayofweek(self, expr: str, prefix: str) -> None:
""" DAYOfWeek expressions (n : Number, s: String)
*
?
n (0~7) - 0 and 7 used interchangeable as Sunday
sss (SUN~SAT)
n/n
n-n/n
*/n
n-n
sss-sss
n|sss,n|sss,n|sss,n-n,sss-sss (maximum 7 elements)
nL
n#n
"""
mi, mx = (0, 7)
if expr in ("*", "?"):
pass
elif re.match(r"\d{1}$", expr):
self.check_range(expr=expr, mi=mi, mx=mx, prefix=prefix)
elif re.match(r"\D{3}$", expr):
cron_days = {v: k for (k, v) in self._cron_days.items()}
if expr.upper() in cron_days:
pass
else:
msg = f"Invalid value '{expr}'"
raise FormatError(msg)
elif re.match(r"\d{1}/\d{1}$", expr):
parts = expr.split("/")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(type_="interval", expr=parts[1], mi=0, mx=mx, prefix=prefix)
elif re.match(r"\d{1}-\d{1}/\d{1}$", expr):
parts = expr.split("/")
fst_parts = parts[0].split("-")
self.check_range(expr=fst_parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=fst_parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=fst_parts[0], ed=fst_parts[1], mi=mi, mx=mx, prefix=prefix)
self.check_range(type_="interval", expr=parts[1], mi=0, mx=mx, prefix=prefix)
elif re.match(r"[*]/\d{1}$", expr):
parts = expr.split("/")
self.check_range(type_="interval", expr=parts[1], mi=0, mx=mx, prefix=prefix)
elif re.match(r"\d{1}-\d{1}$", expr):
parts = expr.split("-")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=parts[0], ed=parts[1], mi=mi, mx=mx, prefix=prefix)
elif re.match(r"\D{3}-\D{3}$", expr):
parts = expr.split("-")
cron_days = {v: k for (k, v) in self._cron_days.items()}
try:
st_day = cron_days[parts[0].upper()]
ed_day = cron_days[parts[1].upper()]
except KeyError as e:
msg = f"({prefix}) Invalid value '{expr}'"
raise FormatError(msg) from e
self.compare_range(st=st_day, ed=ed_day, mi=mi, mx=mx, prefix=prefix, type_="dow")
elif re.match(r"^((\d{1}|\D{3})|(\D{3}-\D{3})|(\d{1}-\d{1}))"
r"((,\d{1})+|(,\D{3})*|(,\d{1}-\d{1})*|(,\D{3}-\D{3})*)*$", expr):
limit = 7
expr_ls = expr.split(",")
if len(expr_ls) > limit:
msg = f"({prefix}) Exceeded maximum number({limit}) of specified value. '{len(expr_ls)}' is provided"
raise FormatError(msg)
cron_days = {v: k for (k, v) in self._cron_days.items()}
for day in expr_ls:
if "-" in day:
parts = day.split("-")
if len(parts[0]) == 3:
self.check_range(expr=cron_days[parts[0].upper()], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=cron_days[parts[1].upper()], mi=mi, mx=mx, prefix=prefix)
else:
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=parts[0], ed=parts[1], mi=mi, mx=mx, prefix=prefix)
else:
# syncronize by add 1 to cron_days index
cron_day = cron_days[day.upper()] + 1 if len(day) == 3 else day
self.check_range(expr=cron_day, mi=mi, mx=mx, prefix=prefix)
elif re.match(r"\d{1}([lL])$", expr):
parts = expr.upper().split("L")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
elif re.match(r"\d#\d$", expr):
parts = expr.split("#")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=parts[1], mi=mi, mx=5, prefix=prefix, type_="dow")
elif re.match(r"\D{3}#\d$", expr):
parts = expr.split("#")
cron_days = {v: k for (k, v) in self._cron_days.items()}
try:
st_day = cron_days[parts[0].upper()]
except KeyError as e:
msg = f"({prefix}) Invalid value '{expr}'"
raise FormatError(msg) from e
self.check_range(expr=parts[1], mi=mi, mx=5, prefix=prefix, type_="dow")
else:
msg = f"({prefix}) Illegal Expression Format '{expr}'"
raise FormatError(msg)
def year(self, expr: str, prefix: str) -> None:
""" Year - valid expression (n : Number)
*
nnnn(1970~2099) - 4 digits number
nnnn-nnnn(1970~2099)
nnnn/nnn(0~129)
*/nnn(0~129)
nnnn,nnnn,nnnn(1970~2099) - maximum 86 elements
"""
mi, mx = (1970, 2099)
if re.match(r"\d{4}$", expr):
self.check_range(expr=expr, mi=mi, mx=mx, prefix=prefix)
elif re.search(r"[-*,/]", expr):
if expr == "*":
pass
elif re.match(r"\d{4}-\d{4}$", expr):
parts = expr.split("-")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=parts[0], ed=parts[1], mi=mi, mx=mx, prefix=prefix)
elif re.match(r"\d{4}/\d{1,3}$", expr):
parts = expr.split("/")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(type_="interval", expr=parts[1], mi=0, mx=129, prefix=prefix)
elif re.match(r"\d{4}-\d{4}/\d{1,3}$", expr):
parts = expr.split("/")
fst_parts = parts[0].split("-")
self.check_range(expr=fst_parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=fst_parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=fst_parts[0], ed=fst_parts[1], mi=mi, mx=mx, prefix=prefix)
self.check_range(type_="interval", expr=parts[1], mi=0, mx=129, prefix=prefix)
elif re.match(r"\*/\d{1,3}$", expr):
parts = expr.split("/")
self.check_range(type_="interval", expr=parts[1], mi=0, mx=129, prefix=prefix)
elif re.match(r"\d{1}/\d{1,3}$", expr):
parts = expr.split("/")
self.check_range(expr=parts[0], mi=0, mx=129, prefix=prefix)
self.check_range(type_="interval", expr=parts[1], mi=0, mx=129, prefix=prefix)
elif re.match(r"^(\d{4}|\d{4}-\d{4})(,\d{4}|,\d{4}-\d{4})+$", expr):
limit = 84
expr_ls = expr.split(",")
if len(expr_ls) > limit:
msg = f"({prefix}) Exceeded maximum number({limit}) of specified value. '{len(expr_ls)}' is provided"
raise FormatError(msg)
for year in expr_ls:
if "-" in year:
parts = year.split("-")
self.check_range(expr=parts[0], mi=mi, mx=mx, prefix=prefix)
self.check_range(expr=parts[1], mi=mi, mx=mx, prefix=prefix)
self.compare_range(st=parts[0], ed=parts[1], mi=mi, mx=mx, prefix=prefix)
else:
self.check_range(expr=year, mi=mi, mx=mx, prefix=prefix)
else:
msg = f"({prefix}) Illegal Expression Format '{expr}'"
raise FormatError(msg)
else:
msg = f"({prefix}) Illegal Expression Format '{expr}'"
raise FormatError(msg)
def check_range(self, prefix: str, mi: int, mx: int, expr: str | int, type_: str| None=None) -> None:
"""
check if expression value within range of specified limit
"""
if int(expr) < mi or mx < int(expr):
if type_ is None:
msg = f"{prefix} values must be between {mi} and {mx} but '{expr}' is provided"
elif type_ == "interval":
msg = f"({prefix}) Accepted increment value range is {mi}~{mx} but '{expr}' is provided"
elif type_ == "dow":
msg = f"({prefix}) Accepted week value is {mi}~{mx} but '{expr}' is provided"
else:
msg = ""
raise FormatError(msg)
def compare_range(self, prefix: str, st: str | int, ed: str | int, mi:int, mx: int, type_: str | None=None) -> None:
""" check 2 expression values size
does not allow {st} value to be greater than {ed} value
"""
st_int = int(st)
ed_int = int(ed)
if st_int > ed_int:
if type_ is None:
msg = f"({prefix}) Invalid range '{st}-{ed}'. Accepted range is {mi}-{mx}"
elif type_ == "dow":
msg = f"({prefix}) Invalid range '{self._cron_days[st_int]}-{self._cron_days[ed_int]}'. Accepted range is {mi}-{mx}"
else:
msg = ""
raise FormatError(msg)

View File

@@ -0,0 +1,64 @@
# The MIT License (MIT)
#
# Copyright (c) 2016 Adam Schubert
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from __future__ import annotations
import gettext
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
class FallBackNull(gettext.NullTranslations):
def gettext(self, _message: str) -> str:
# If we get here, that means that original translator failed, we will return empty string
return ""
class GetText:
"""Handles language translations
"""
def __init__(self, locale_code: str, locale_location: str | None = None) -> None:
"""Initialize GetText
:param locale_code selected locale
"""
try:
self.trans = self.load_locale(locale_code, locale_location)
except OSError:
logger.debug("Failed to find locale %s", locale_code)
logger.debug("Attempting to load en_US as fallback")
self.trans = self.load_locale("en_US")
# Add fallback that does not return original string, this is hack to add
# support for _("") or _("")
self.trans.add_fallback(FallBackNull())
def load_locale(self, locale_code: str, locale_location: str | None=None) -> gettext.GNUTranslations:
dir_path = Path(locale_location) if locale_location else Path(__file__).resolve().parent.joinpath("locale")
filename = dir_path.joinpath(f"{locale_code}.mo")
with filename.open("rb") as f:
trans = gettext.GNUTranslations(f)
logger.debug("%s Loaded", filename)
return trans

View File

@@ -0,0 +1,95 @@
# The MIT License (MIT)
#
# Copyright (c) 2016 Adam Schubert
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from __future__ import annotations
import locale
import warnings
from .CasingTypeEnum import CasingTypeEnum
class Options:
"""
Options for parsing and describing a Cron Expression
"""
locale_code: str
casing_type: CasingTypeEnum
verbose: bool
day_of_week_start_index_zero: bool
use_24hour_time_format: bool
locale_location: str | None
_twelve_hour_locales = (
"en_US", # United States
"en_CA", # Canada (English)
"fr_CA", # Canada (French)
"en_AU", # Australia
"en_NZ", # New Zealand
"en_IE", # Ireland
"en_PH", # Philippines
"es_MX", # Mexico
"en_PK", # Pakistan
"en_IN", # India
"ar_SA", # Saudi Arabia
"bn_BD", # Bangladesh
"es_HN", # Honduras
"es_SV", # El Salvador
"es_NI", # Nicaragua
"ar_JO", # Jordan
"ar_EG", # Egypt
"es_CO", # Colombia
)
def __init__(self,
casing_type: CasingTypeEnum = CasingTypeEnum.Sentence,
*,
verbose: bool = False,
day_of_week_start_index_zero: bool = True,
use_24hour_time_format: bool | None = None,
locale_code: str | None = None,
locale_location: str | None = None,
) -> None:
self.casing_type = casing_type
self.verbose = verbose
self.day_of_week_start_index_zero = day_of_week_start_index_zero
self.locale_location = locale_location
if not locale_code:
# Autodetect
code, _encoding = locale.getlocale()
if not code:
warnings.warn(
"No system locale set. Falling back to 'en_US'. "
"Set LANG/LC_ALL or pass locale_code to override.",
stacklevel=2,
)
code = "en_US"
self.locale_code = code
else:
self.locale_code = locale_code
if use_24hour_time_format is None:
# Autodetect
self.use_24hour_time_format = self.locale_code not in self._twelve_hour_locales
else:
self.use_24hour_time_format = use_24hour_time_format

View File

@@ -0,0 +1,50 @@
# The MIT License (MIT)
#
# Copyright (c) 2016 Adam Schubert
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
class StringBuilder:
string: list[str]
"""
Builds string parts together acting like Java/.NET StringBuilder
"""
def __init__(self) -> None:
self.string = []
def append(self, string: str) -> None:
"""Appends non empty string
Args:
string: String to append
Returns:
None
"""
if string:
self.string.append(string)
def __str__(self) -> str:
return "".join(self.string)
def __len__(self) -> int:
return len(self.string)

View File

@@ -0,0 +1,42 @@
# The MIT License (MIT)
#
# Copyright (c) 2016 Adam Schubert
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from .CasingTypeEnum import CasingTypeEnum
from .DescriptionTypeEnum import DescriptionTypeEnum
from .Exception import FormatError, FormatException, MissingFieldError, MissingFieldException, WrongArgumentError, WrongArgumentException
from .ExpressionDescriptor import ExpressionDescriptor, get_description
from .Options import Options
__version__ = "2.0.6"
__all__ = [
"CasingTypeEnum",
"DescriptionTypeEnum",
"ExpressionDescriptor",
"FormatError",
"FormatException",
"MissingFieldError",
"MissingFieldException",
"Options",
"WrongArgumentError",
"WrongArgumentException",
"get_description",
]

View File

@@ -0,0 +1,25 @@
import argparse
from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, Options
parser = argparse.ArgumentParser(prog="cron_descriptor")
parser.add_argument("expression")
parser.add_argument("-c", "--casing",
choices=[v for v in vars(CasingTypeEnum)
if not v.startswith("_")],
default="Sentence")
parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("-W", "--one-indexed-week", action="store_true")
parser.add_argument("-H", "--use-24-hour-time-format", action="store_true")
args = parser.parse_args()
options = Options()
options.casing_type = getattr(CasingTypeEnum, args.casing)
options.verbose = args.verbose
options.day_of_week_start_index_zero = not args.one_indexed_week
options.use_24hour_time_format = args.use_24_hour_time_format
descriptor = ExpressionDescriptor(args.expression, options)
print(str(descriptor))