Source code for cassandra

# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging


class NullHandler(logging.Handler):

    def emit(self, record):
        pass

logging.getLogger('cassandra').addHandler(NullHandler())

__version_info__ = (3, 13, 0)
__version__ = '.'.join(map(str, __version_info__))


[docs]class ConsistencyLevel(object): """ Spcifies how many replicas must respond for an operation to be considered a success. By default, ``ONE`` is used for all operations. """ ANY = 0 """ Only requires that one replica receives the write *or* the coordinator stores a hint to replay later. Valid only for writes. """ ONE = 1 """ Only one replica needs to respond to consider the operation a success """ TWO = 2 """ Two replicas must respond to consider the operation a success """ THREE = 3 """ Three replicas must respond to consider the operation a success """ QUORUM = 4 """ ``ceil(RF/2)`` replicas must respond to consider the operation a success """ ALL = 5 """ All replicas must respond to consider the operation a success """ LOCAL_QUORUM = 6 """ Requires a quorum of replicas in the local datacenter """ EACH_QUORUM = 7 """ Requires a quorum of replicas in each datacenter """ SERIAL = 8 """ For conditional inserts/updates that utilize Cassandra's lightweight transactions, this requires consensus among all replicas for the modified data. """ LOCAL_SERIAL = 9 """ Like :attr:`~ConsistencyLevel.SERIAL`, but only requires consensus among replicas in the local datacenter. """ LOCAL_ONE = 10 """ Sends a request only to replicas in the local datacenter and waits for one response. """
ConsistencyLevel.value_to_name = { ConsistencyLevel.ANY: 'ANY', ConsistencyLevel.ONE: 'ONE', ConsistencyLevel.TWO: 'TWO', ConsistencyLevel.THREE: 'THREE', ConsistencyLevel.QUORUM: 'QUORUM', ConsistencyLevel.ALL: 'ALL', ConsistencyLevel.LOCAL_QUORUM: 'LOCAL_QUORUM', ConsistencyLevel.EACH_QUORUM: 'EACH_QUORUM', ConsistencyLevel.SERIAL: 'SERIAL', ConsistencyLevel.LOCAL_SERIAL: 'LOCAL_SERIAL', ConsistencyLevel.LOCAL_ONE: 'LOCAL_ONE' } ConsistencyLevel.name_to_value = { 'ANY': ConsistencyLevel.ANY, 'ONE': ConsistencyLevel.ONE, 'TWO': ConsistencyLevel.TWO, 'THREE': ConsistencyLevel.THREE, 'QUORUM': ConsistencyLevel.QUORUM, 'ALL': ConsistencyLevel.ALL, 'LOCAL_QUORUM': ConsistencyLevel.LOCAL_QUORUM, 'EACH_QUORUM': ConsistencyLevel.EACH_QUORUM, 'SERIAL': ConsistencyLevel.SERIAL, 'LOCAL_SERIAL': ConsistencyLevel.LOCAL_SERIAL, 'LOCAL_ONE': ConsistencyLevel.LOCAL_ONE } def consistency_value_to_name(value): return ConsistencyLevel.value_to_name[value] if value is not None else "Not Set"
[docs]class ProtocolVersion(object): """ Defines native protocol versions supported by this driver. """ V1 = 1 """ v1, supported in Cassandra 1.2-->2.2 """ V2 = 2 """ v2, supported in Cassandra 2.0-->2.2; added support for lightweight transactions, batch operations, and automatic query paging. """ V3 = 3 """ v3, supported in Cassandra 2.1-->3.x+; added support for protocol-level client-side timestamps (see :attr:`.Session.use_client_timestamp`), serial consistency levels for :class:`~.BatchStatement`, and an improved connection pool. """ V4 = 4 """ v4, supported in Cassandra 2.2-->3.x+; added a number of new types, server warnings, new failure messages, and custom payloads. Details in the `project docs <https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v4.spec>`_ """ V5 = 5 """ v5, in beta from 3.x+ """ SUPPORTED_VERSIONS = (V5, V4, V3, V2, V1) """ A tuple of all supported protocol versions """ BETA_VERSIONS = (V5,) """ A tuple of all beta protocol versions """ MIN_SUPPORTED = min(SUPPORTED_VERSIONS) """ Minimum protocol version supported by this driver. """ MAX_SUPPORTED = max(SUPPORTED_VERSIONS) """ Maximum protocol versioni supported by this driver. """
[docs] @classmethod def get_lower_supported(cls, previous_version): """ Return the lower supported protocol version. Beta versions are omitted. """ try: version = next(v for v in sorted(ProtocolVersion.SUPPORTED_VERSIONS, reverse=True) if v not in ProtocolVersion.BETA_VERSIONS and v < previous_version) except StopIteration: version = 0 return version
@classmethod def uses_int_query_flags(cls, version): return version >= cls.V5 @classmethod def uses_prepare_flags(cls, version): return version >= cls.V5 @classmethod def uses_prepared_metadata(cls, version): return version >= cls.V5 @classmethod def uses_error_code_map(cls, version): return version >= cls.V5 @classmethod def uses_keyspace_flag(cls, version): return version >= cls.V5
class SchemaChangeType(object): DROPPED = 'DROPPED' CREATED = 'CREATED' UPDATED = 'UPDATED' class SchemaTargetType(object): KEYSPACE = 'KEYSPACE' TABLE = 'TABLE' TYPE = 'TYPE' FUNCTION = 'FUNCTION' AGGREGATE = 'AGGREGATE' class SignatureDescriptor(object): def __init__(self, name, argument_types): self.name = name self.argument_types = argument_types @property def signature(self): """ function signature string in the form 'name([type0[,type1[...]]])' can be used to uniquely identify overloaded function names within a keyspace """ return self.format_signature(self.name, self.argument_types) @staticmethod def format_signature(name, argument_types): return "%s(%s)" % (name, ','.join(t for t in argument_types)) def __repr__(self): return "%s(%s, %s)" % (self.__class__.__name__, self.name, self.argument_types)
[docs]class UserFunctionDescriptor(SignatureDescriptor): """ Describes a User function by name and argument signature """ name = None """ name of the function """ argument_types = None """ Ordered list of CQL argument type names comprising the type signature """
[docs]class UserAggregateDescriptor(SignatureDescriptor): """ Describes a User aggregate function by name and argument signature """ name = None """ name of the aggregate """ argument_types = None """ Ordered list of CQL argument type names comprising the type signature """
[docs]class DriverException(Exception): """ Base for all exceptions explicitly raised by the driver. """ pass
[docs]class RequestExecutionException(DriverException): """ Base for request execution exceptions returned from the server. """ pass
[docs]class Unavailable(RequestExecutionException): """ There were not enough live replicas to satisfy the requested consistency level, so the coordinator node immediately failed the request without forwarding it to any replicas. """ consistency = None """ The requested :class:`ConsistencyLevel` """ required_replicas = None """ The number of replicas that needed to be live to complete the operation """ alive_replicas = None """ The number of replicas that were actually alive """ def __init__(self, summary_message, consistency=None, required_replicas=None, alive_replicas=None): self.consistency = consistency self.required_replicas = required_replicas self.alive_replicas = alive_replicas Exception.__init__(self, summary_message + ' info=' + repr({'consistency': consistency_value_to_name(consistency), 'required_replicas': required_replicas, 'alive_replicas': alive_replicas}))
[docs]class Timeout(RequestExecutionException): """ Replicas failed to respond to the coordinator node before timing out. """ consistency = None """ The requested :class:`ConsistencyLevel` """ required_responses = None """ The number of required replica responses """ received_responses = None """ The number of replicas that responded before the coordinator timed out the operation """ def __init__(self, summary_message, consistency=None, required_responses=None, received_responses=None): self.consistency = consistency self.required_responses = required_responses self.received_responses = received_responses Exception.__init__(self, summary_message + ' info=' + repr({'consistency': consistency_value_to_name(consistency), 'required_responses': required_responses, 'received_responses': received_responses}))
[docs]class ReadTimeout(Timeout): """ A subclass of :exc:`Timeout` for read operations. This indicates that the replicas failed to respond to the coordinator node before the configured timeout. This timeout is configured in ``cassandra.yaml`` with the ``read_request_timeout_in_ms`` and ``range_request_timeout_in_ms`` options. """ data_retrieved = None """ A boolean indicating whether the requested data was retrieved by the coordinator from any replicas before it timed out the operation """ def __init__(self, message, data_retrieved=None, **kwargs): Timeout.__init__(self, message, **kwargs) self.data_retrieved = data_retrieved
[docs]class WriteTimeout(Timeout): """ A subclass of :exc:`Timeout` for write operations. This indicates that the replicas failed to respond to the coordinator node before the configured timeout. This timeout is configured in ``cassandra.yaml`` with the ``write_request_timeout_in_ms`` option. """ write_type = None """ The type of write operation, enum on :class:`~cassandra.policies.WriteType` """ def __init__(self, message, write_type=None, **kwargs): Timeout.__init__(self, message, **kwargs) self.write_type = write_type
class CDCWriteFailure(RequestExecutionException): """ Hit limit on data in CDC folder, writes are rejected """ def __init__(self, message): Exception.__init__(self, message)
[docs]class CoordinationFailure(RequestExecutionException): """ Replicas sent a failure to the coordinator. """ consistency = None """ The requested :class:`ConsistencyLevel` """ required_responses = None """ The number of required replica responses """ received_responses = None """ The number of replicas that responded before the coordinator timed out the operation """ failures = None """ The number of replicas that sent a failure message """ error_code_map = None """ A map of inet addresses to error codes representing replicas that sent a failure message. Only set when `protocol_version` is 5 or higher. """ def __init__(self, summary_message, consistency=None, required_responses=None, received_responses=None, failures=None, error_code_map=None): self.consistency = consistency self.required_responses = required_responses self.received_responses = received_responses self.failures = failures self.error_code_map = error_code_map info_dict = { 'consistency': consistency_value_to_name(consistency), 'required_responses': required_responses, 'received_responses': received_responses, 'failures': failures } if error_code_map is not None: # make error codes look like "0x002a" formatted_map = dict((addr, '0x%04x' % err_code) for (addr, err_code) in error_code_map.items()) info_dict['error_code_map'] = formatted_map Exception.__init__(self, summary_message + ' info=' + repr(info_dict))
[docs]class ReadFailure(CoordinationFailure): """ A subclass of :exc:`CoordinationFailure` for read operations. This indicates that the replicas sent a failure message to the coordinator. """ data_retrieved = None """ A boolean indicating whether the requested data was retrieved by the coordinator from any replicas before it timed out the operation """ def __init__(self, message, data_retrieved=None, **kwargs): CoordinationFailure.__init__(self, message, **kwargs) self.data_retrieved = data_retrieved
[docs]class WriteFailure(CoordinationFailure): """ A subclass of :exc:`CoordinationFailure` for write operations. This indicates that the replicas sent a failure message to the coordinator. """ write_type = None """ The type of write operation, enum on :class:`~cassandra.policies.WriteType` """ def __init__(self, message, write_type=None, **kwargs): CoordinationFailure.__init__(self, message, **kwargs) self.write_type = write_type
[docs]class FunctionFailure(RequestExecutionException): """ User Defined Function failed during execution """ keyspace = None """ Keyspace of the function """ function = None """ Name of the function """ arg_types = None """ List of argument type names of the function """ def __init__(self, summary_message, keyspace, function, arg_types): self.keyspace = keyspace self.function = function self.arg_types = arg_types Exception.__init__(self, summary_message)
[docs]class RequestValidationException(DriverException): """ Server request validation failed """ pass
[docs]class ConfigurationException(RequestValidationException): """ Server indicated request errro due to current configuration """ pass
[docs]class AlreadyExists(ConfigurationException): """ An attempt was made to create a keyspace or table that already exists. """ keyspace = None """ The name of the keyspace that already exists, or, if an attempt was made to create a new table, the keyspace that the table is in. """ table = None """ The name of the table that already exists, or, if an attempt was make to create a keyspace, :const:`None`. """ def __init__(self, keyspace=None, table=None): if table: message = "Table '%s.%s' already exists" % (keyspace, table) else: message = "Keyspace '%s' already exists" % (keyspace,) Exception.__init__(self, message) self.keyspace = keyspace self.table = table
[docs]class InvalidRequest(RequestValidationException): """ A query was made that was invalid for some reason, such as trying to set the keyspace for a connection to a nonexistent keyspace. """ pass
[docs]class Unauthorized(RequestValidationException): """ The current user is not authorized to perform the requested operation. """ pass
[docs]class AuthenticationFailed(DriverException): """ Failed to authenticate. """ pass
[docs]class OperationTimedOut(DriverException): """ The operation took longer than the specified (client-side) timeout to complete. This is not an error generated by Cassandra, only the driver. """ errors = None """ A dict of errors keyed by the :class:`~.Host` against which they occurred. """ last_host = None """ The last :class:`~.Host` this operation was attempted against. """ def __init__(self, errors=None, last_host=None): self.errors = errors self.last_host = last_host message = "errors=%s, last_host=%s" % (self.errors, self.last_host) Exception.__init__(self, message)
class UnsupportedOperation(DriverException): """ An attempt was made to use a feature that is not supported by the selected protocol version. See :attr:`Cluster.protocol_version` for more details. """ pass