Source code for ax.storage.sqa_store.timestamp

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

import datetime

from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy.types import Integer, TypeDecorator


[docs] class IntTimestamp(TypeDecorator): impl = Integer cache_ok = True # pyre-fixme[15]: `process_bind_param` overrides method defined in # `TypeDecorator` inconsistently.
[docs] def process_bind_param( self, value: datetime.datetime | None, dialect: Dialect ) -> int | None: if value is None: return None else: return int(value.timestamp())
[docs] def process_result_value( self, value: int | None, dialect: Dialect ) -> datetime.datetime | None: return None if value is None else datetime.datetime.fromtimestamp(value)