Skip to content

Commit

Permalink
Merge pull request #3 from dushibaiyu/master
Browse files Browse the repository at this point in the history
up rdkafka to 0.91
  • Loading branch information
dushibaiyu authored Jun 16, 2016
2 parents d9d36e0 + 4901572 commit c27fa3e
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 52 deletions.
131 changes: 118 additions & 13 deletions c/rdkafka.h
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ extern "C" {
#include <basetsd.h>
typedef SSIZE_T ssize_t;
#define RD_UNUSED
#define RD_INLINE __inline
#define RD_DEPRECATED
#undef RD_EXPORT
#ifdef LIBRDKAFKA_EXPORTS
Expand All @@ -68,6 +69,7 @@ typedef SSIZE_T ssize_t;

#else
#define RD_UNUSED __attribute__((unused))
#define RD_INLINE inline
#define RD_EXPORT
#define RD_DEPRECATED __attribute__((deprecated))
#endif
Expand Down Expand Up @@ -96,7 +98,7 @@ typedef SSIZE_T ssize_t;
* @remark This value should only be used during compile time,
* for runtime checks of version use rd_kafka_version()
*/
#define RD_KAFKA_VERSION 0x00090100
#define RD_KAFKA_VERSION 0x000901ff

/**
* @brief Returns the librdkafka version as integer.
Expand Down Expand Up @@ -141,6 +143,19 @@ typedef enum rd_kafka_type_t {
} rd_kafka_type_t;


/**
* @enum Timestamp types
*
* @sa rd_kafka_message_timestamp()
*/
typedef enum rd_kafka_timestamp_type_t {
RD_KAFKA_TIMESTAMP_NOT_AVAILABLE, /**< Timestamp not available */
RD_KAFKA_TIMESTAMP_CREATE_TIME, /**< Message creation time */
RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME /**< Log append time */
} rd_kafka_timestamp_type_t;



/**
* @brief Retrieve supported debug contexts for use with the \c \"debug\"
* configuration property. (runtime)
Expand Down Expand Up @@ -252,6 +267,8 @@ typedef enum {
RD_KAFKA_RESP_ERR__AUTHENTICATION = -169,
/** No stored offset */
RD_KAFKA_RESP_ERR__NO_OFFSET = -168,
/** Outdated */
RD_KAFKA_RESP_ERR__OUTDATED = -167,
/** End internal error codes */
RD_KAFKA_RESP_ERR__END = -100,

Expand Down Expand Up @@ -322,6 +339,14 @@ typedef enum {
RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED = 30,
/** Cluster authorization failed */
RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED = 31,
/** Invalid timestamp */
RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP = 32,
/** Unsupported SASL mechanism */
RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM = 33,
/** Illegal SASL state */
RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE = 34,
/** Unuspported version */
RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION = 35,

RD_KAFKA_RESP_ERR_END_ALL,
} rd_kafka_resp_err_t;
Expand Down Expand Up @@ -530,6 +555,37 @@ rd_kafka_topic_partition_list_add_range (rd_kafka_topic_partition_list_t



/**
* @brief Delete partition from list.
*
* @param rktparlist List to modify
* @param topic Topic name to match
* @param partition Partition to match
*
* @returns 1 if partition was found (and removed), else 0.
*
* @remark Any held indices to elems[] are unusable after this call returns 1.
*/
RD_EXPORT
int
rd_kafka_topic_partition_list_del (rd_kafka_topic_partition_list_t *rktparlist,
const char *topic, int32_t partition);


/**
* @brief Delete partition from list by elems[] index.
*
* @returns 1 if partition was found (and removed), else 0.
*
* @sa rd_kafka_topic_partition_list_del()
*/
RD_EXPORT
int
rd_kafka_topic_partition_list_del_by_idx (
rd_kafka_topic_partition_list_t *rktparlist,
int idx);


/**
* @brief Make a copy of an existing list.
*
Expand Down Expand Up @@ -585,9 +641,9 @@ rd_kafka_topic_partition_list_find (rd_kafka_topic_partition_list_t *rktparlist,

/**
* @brief A Kafka message as returned by the \c rd_kafka_consume*() family
* of functions.
* of functions as well as provided to the Producer \c dr_msg_cb().
*
* This object has two purposes:
* For the consumer this object has two purposes:
* - provide the application with a consumed message. (\c err == 0)
* - report per-topic+partition consumer errors (\c err != 0)
*
Expand All @@ -600,7 +656,8 @@ typedef struct rd_kafka_message_s {
rd_kafka_resp_err_t err; /**< Non-zero for error signaling. */
rd_kafka_topic_t *rkt; /**< Topic */
int32_t partition; /**< Partition */
void *payload; /**< Depends on the value of \c err :
void *payload; /**< Producer: original message payload.
* Consumer: Depends on the value of \c err :
* - \c err==0: Message payload.
* - \c err!=0: Error string */
size_t len; /**< Depends on the value of \c err :
Expand Down Expand Up @@ -640,7 +697,7 @@ void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage);
* @brief Returns the error string for an errored rd_kafka_message_t or NULL if
* there was no error.
*/
static __inline const char *
static RD_INLINE const char *
RD_UNUSED
rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage) {
if (!rkmessage->err)
Expand All @@ -652,6 +709,25 @@ rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage) {
return rd_kafka_err2str(rkmessage->err);
}



/**
* @brief Returns the message timestamp for a consumed message.
*
* The timestamp is the number of milliseconds since the epoch (UTC).
*
* \p tstype is updated to indicate the type of timestamp.
*
* @returns message timestamp, or -1 if not available.
*
* @remark Message timestamps require broker version 0.10.0 or later.
*/
RD_EXPORT
int64_t rd_kafka_message_timestamp (const rd_kafka_message_t *rkmessage,
rd_kafka_timestamp_type_t *tstype);



/**@}*/


Expand Down Expand Up @@ -805,7 +881,11 @@ void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf,
* such as fetching offsets from an alternate location (on assign)
* or manually committing offsets (on revoke).
*
* The following example show's the application's responsibilities:
* @remark The \p partitions list is destroyed by librdkafka on return
* return from the rebalance_cb and must not be freed or
* saved by the application.
*
* The following example shows the application's responsibilities:
* @code
* static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
* rd_kafka_topic_partition_list_t *partitions,
Expand Down Expand Up @@ -885,15 +965,16 @@ void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf,
/**
* @brief Set throttle callback.
*
* The throttle callback is used in conjunction with
* \c quota.support.enable=true to forward broker throttle times to the
* The throttle callback is used to forward broker throttle times to the
* application for Produce and Fetch (consume) requests.
*
* Callbacks are triggered whenever a non-zero throttle time is returned by
* the broker, or when the throttle time drops back to zero.
*
* An application must call rd_kafka_poll() or rd_kafka_consumer_poll() at
* regular intervals to serve queued callbacks.
*
* @remark Requires broker version 0.9.0 or later.
*/
RD_EXPORT
void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf,
Expand Down Expand Up @@ -1604,7 +1685,7 @@ int rd_kafka_consume_start_queue(rd_kafka_topic_t *rkt, int32_t partition,
* all messages currently in the local queue.
*
* NOTE: To enforce synchronisation this call will block until the internal
* fetcher has terminated and offsets are commited to configured
* fetcher has terminated and offsets are committed to configured
* storage method.
*
* The application needs to be stop all consumers before calling
Expand Down Expand Up @@ -1787,7 +1868,7 @@ int rd_kafka_consume_callback_queue(rd_kafka_queue_t *rkqu,
/**
* @brief Store offset \p offset for topic \p rkt partition \p partition.
*
* The offset will be commited (written) to the offset store according
* The offset will be committed (written) to the offset store according
* to \c `auto.commit.interval.ms`.
*
* @remark \c `auto.commit.enable` must be set to "false" when using this API.
Expand Down Expand Up @@ -1873,6 +1954,7 @@ rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, int timeout_ms);
* @remark This call will block until the consumer has revoked its assignment,
* calling the \c rebalance_cb if it is configured, committed offsets
* to broker, and left the consumer group.
* The maximum blocking time is roughly limited to session.timeout.ms.
*
* @returns An error code indicating if the consumer close was succesful
* or not.
Expand Down Expand Up @@ -1939,7 +2021,30 @@ rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,


/**
* @brief Retrieve committed positions (offsets) for topics+partitions.
* @brief Retrieve committed offsets for topics+partitions.
*
* The \p offset field of each requested partition will either be set to
* stored offset or to RD_KAFKA_OFFSET_INVALID in case there was no stored
* offset for that partition.
*
* @returns RD_KAFKA_RESP_ERR_NO_ERROR on success in which case the
* \p offset or \p err field of each \p partitions' element is filled
* in with the stored offset, or a partition specific error.
* Else returns an error code.
*/
RD_EXPORT rd_kafka_resp_err_t
rd_kafka_committed (rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *partitions,
int timeout_ms);



/**
* @brief Retrieve current positions (offsets) for topics+partitions.
*
* The \p offset field of each requested partition will be set to the offset
* of the last consumed message + 1, or RD_KAFKA_OFFSET_INVALID in case there was
* no previous message.
*
* @returns RD_KAFKA_RESP_ERR_NO_ERROR on success in which case the
* \p offset or \p err field of each \p partitions' element is filled
Expand All @@ -1948,8 +2053,8 @@ rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
*/
RD_EXPORT rd_kafka_resp_err_t
rd_kafka_position (rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *partitions,
int timeout_ms);
rd_kafka_topic_partition_list_t *partitions);


/**@}*/

Expand Down
Loading

0 comments on commit c27fa3e

Please sign in to comment.