2024-11-12 stan
int init_message(void)
{
char errstr[1024];
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (rd_kafka_conf_set(conf, "bootstrap.servers", settings.brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
log_stderr("Set kafka brokers: %s fail: %s", settings.brokers, errstr);
return -__LINE__;
}
if (rd_kafka_conf_set(conf, "queue.buffering.max.ms", "1", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
log_stderr("Set kafka buffering: %s fail: %s", settings.brokers, errstr);
return -__LINE__;
}
rd_kafka_conf_set_log_cb(conf, on_logger);
rd_kafka_conf_set_dr_msg_cb(conf, on_delivery);
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (rk == NULL) {
log_stderr("Failed to create new producer: %s", errstr);
return -__LINE__;
}
rkt_balances = rd_kafka_topic_new(rk, "balances", NULL);
if (rkt_balances == NULL) {
log_stderr("Failed to create topic object: %s", rd_kafka_err2str(rd_kafka_last_error()));
return -__LINE__;
}
rkt_orders = rd_kafka_topic_new(rk, "orders", NULL);
if (rkt_orders == NULL) {
log_stderr("Failed to create topic object: %s", rd_kafka_err2str(rd_kafka_last_error()));
return -__LINE__;
}
rkt_deals = rd_kafka_topic_new(rk, "deals", NULL);
if (rkt_deals == NULL) {
log_stderr("Failed to create topic object: %s", rd_kafka_err2str(rd_kafka_last_error()));
return -__LINE__;
}
list_type lt;
memset(<, 0, sizeof(lt));
lt.free = on_list_free;
list_deals = list_create(<);
if (list_deals == NULL)
return -__LINE__;
list_orders = list_create(<);
if (list_orders == NULL)
return -__LINE__;
list_balances = list_create(<);
if (list_balances == NULL)
return -__LINE__;
nw_timer_set(&timer, 0.1, true, on_timer, NULL);
nw_timer_start(&timer);
return 0;
}
taker->update_time = maker->update_time = current_timestamp();
uint64_t deal_id = ++deals_id_start;
if (real) {
append_order_deal_history(taker->update_time, deal_id, taker, MARKET_ROLE_TAKER, maker, MARKET_ROLE_MAKER, price, amount, deal, ask_fee, bid_fee);
push_deal_message(taker->update_time, m->name, taker, maker, price, amount, ask_fee, bid_fee, MARKET_ORDER_SIDE_ASK, deal_id, m->stock, m->money);
}
balance_sub(maker->user_id, BALANCE_TYPE_FREEZE, m->money, deal);
if (real) {
append_balance_trade_sub(maker, m->money, deal, price, amount);
}
balance_add(maker->user_id, BALANCE_TYPE_AVAILABLE, m->stock, amount);
if (real) {
append_balance_trade_add(maker, m->stock, amount, price, amount);
}
if (mpd_cmp(bid_fee, mpd_zero, &mpd_ctx) > 0) {
balance_sub(maker->user_id, BALANCE_TYPE_AVAILABLE, m->stock, bid_fee);
if (real) {
append_balance_trade_fee(maker, m->stock, bid_fee, price, amount, maker->maker_fee);
}
}
if (mpd_cmp(maker->left, mpd_zero, &mpd_ctx) == 0) {
if (real) {
push_order_message(ORDER_EVENT_FINISH, maker, m);
}
order_finish(real, m, maker);
} else {
if (real) {
push_order_message(ORDER_EVENT_UPDATE, maker, m);
}
}