Elastic-Job——分布式定时任务框架

今天我们学习下Elastic-job解决以下问题:
我们开发定时任务一般都是使用quartz或者spring-task(ScheduledExecutorService),无论是使用quartz还是spring-task,我们都会至少遇到两个痛点:

  1. 不敢轻易跟着应用服务多节点部署,可能会重复多次执行而引发系统逻辑的错误。
  2. quartz的集群仅仅只是用来HA,节点数量的增加并不能给我们的每次执行效率带来提升,即不能实现水平扩展。

Elastic-job的主要包括以下功能:

  • 定时任务:基于成熟的定时任务作业框架Quartz cron表达式执行定时任务。
  • 作业注册中心:基于Zookeeper和其客户端Curator实现的全局作业注册控制中心。用于注册,控制和协调分布式作业执行。
  • 作业分片:将一个任务分片成为多个小任务项在多服务器上同时执行。
  • 弹性扩容缩容:运行中的作业服务器崩溃,或新增加n台作业服务器,作业框架将在下次作业执行前重新分片,不影响当前作业执行。
  • 支持多种作业执行模式:支持OneOff(类似Quartz原生作业),Perpetual(类似TbSchedule作业)和SequencePerpetual(类似TbSchedule和kafka的合并,但处理时维持分片顺序)三种作业模式。
  • 失效转移:运行中的作业服务器崩溃不会导致重新分片,只会在下次作业启动时分片。启用失效转移功能可以在本次作业执行过程中,监测其他作业服务器空闲,抓取未完成的孤儿分片项执行。
  • 运行时状态收集:监控作业运行时状态,统计最近一段时间处理的数据成功和失败数量,记录作业上次运行开始时间,结束时间和下次运行时间。
  • 作业停止,恢复和禁用:用于操作作业启停,并可以禁止某作业运行(上线时常用)。
  • 被错过执行的作业重触发:自动记录错过执行的作业,并在上次作业完成后自动触发。可参考Quartz的misfire。
  • 多线程快速处理数据:使用多线程处理抓取到的数据,提升吞吐量。
  • 幂等性:重复作业任务项判定,不重复执行已运行的作业任务项。由于开启幂等性需要监听作业运行状态,对瞬时反复运行的作业对性能有较大影响。
  • 容错处理:作业服务器与Zookeeper服务器通信失败则立即停止作业运行,防止作业注册中心将失效的分片分项配给其他作业服务器,而当前作业服务器仍在执行任务,导致重复执行。
  • Spring支持:支持spring容器,自定义命名空间,支持占位符。
  • 运维平台:提供运维界面,可以管理作业和注册中心。
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
</dependency>
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.5</version>
</dependency>

定时任务配置

regCenter.serverList=127.0.0.1:2181
regCenter.namespace=elastic-job-finance
stockJob.cron=0 0 13 ?
stockJob.shardingTotalCount=2
stockJob.shardingItemParameters=0=cytx0,1=cytx1

elastic-Job配置参数详细解释:https://blog.csdn.net/dhj199181/article/details/83088036


@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class RegistryCenterConfig {

@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {
    return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}

}

@Component
@Service
public class GoldSettleJob implements SimpleJob {
    private static final Logger logger = LoggerFactory.getLogger(GoldSettleJob.class);

    @Autowired
    private GoldService goldService;

    @Override
    public void execute(ShardingContext shardingContext) {
        logger.info("GoldSettleJob:------Thread ID:{}, 任务总片数:{}, 当前分片项:{}, 当前参数:{}, 当前任务名称:{}, 当前任务参数:{}",
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter());

        int shardingItem = shardingContext.getShardingItem();
        int shardingTotalCount = shardingContext.getShardingTotalCount();

        //结算当天00:00:00时间节点之前的记录数据
        Date currentDate = DateUtil.paraseDate(DateUtil.yyyy_MM_dd + " 00:00:00", new Date());
        Map`<String, List<CytxGoldRecord>> map = goldService.getSettleGold(shardingTotalCount, shardingItem, currentDate);`
        goldService.settleGold(map, currentDate);
    }

}

@Configuration
public class JobConfig {

@Autowired
private ZookeeperRegistryCenter regCenter;

private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(),jobClass.getCanonicalName())).overwrite(true).build();
    }

    @Bean(initMethod = "init")
    public JobScheduler dataflowJobScheduler(final FinanceSettlementJob financeSettlementJob,
                                             @Value("${stockJob.cron}") final String cron,
                                             @Value("${stockJob.shardingTotalCount}") final int shardingTotalCount,
                                             @Value("${stockJob.shardingItemParameters}") final String shardingItemParameters) {
        MyElasticJobListener elasticJobListener = new MyElasticJobListener();
        return new SpringJobScheduler(financeSettlementJob, regCenter, getLiteJobConfiguration(financeSettlementJob.getClass(), cron, shardingTotalCount, shardingItemParameters), elasticJobListener);
    }
}

public class MyElasticJobListener implements ElasticJobListener {
    private static final Logger logger = LoggerFactory.getLogger(MyElasticJobListener.class);

    private long beginTime = 0;

    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        beginTime = System.currentTimeMillis();
        logger.info("===>{} JOB BEGIN TIME: {} <===",shardingContexts.getJobName(), DateUtil.getDate(beginTime, DateUtil.yyyy_MM_dd_HH_mm_ss));
    }

    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        long endTime = System.currentTimeMillis();
        logger.info("===>{} JOB END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), DateUtil.getDate(endTime, DateUtil.yyyy_MM_dd_HH_mm_ss), endTime - beginTime);
    }
}

<select id="getSettleGoldRecordByParam" resultMap="BaseResultMap">
  select user_id, ifnull(sum(num), 0) as num
  from cytx_gold_record
  where finish_time is NULL and #{settleDate} > create_time and type = #{type} and mod(user_id,#{shardingTotalCount})=#{shardingItem} GROUP BY user_id
</select>
讨论数量: 1

牛皮

4年前

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!