Saturday, August 24, 2024

Leader Election

CREATE TABLE LeaderElection (

    Id INT PRIMARY KEY IDENTITY,

    HostName NVARCHAR(255) NOT NULL,

    Port INT NOT NULL,

    IsLeader BIT NOT NULL,

    LastHeartbeat DATETIME NOT NULL

);

 

public class LeaderElection { public int Id { get; set; } public string HostName { get; set; } public int Port { get; set; } public bool IsLeader { get; set; } public DateTime LastHeartbeat { get; set; } } 



public class LeaderElectionService

{

    private readonly ApplicationDbContext _dbContext;

    private readonly IHttpContextAccessor _httpContextAccessor;

    private string _hostName;

    private int _port;

    private Timer _timer;


    public LeaderElectionService(ApplicationDbContext dbContext, IHttpContextAccessor httpContextAccessor)

    {

        _dbContext = dbContext;

        _httpContextAccessor = httpContextAccessor;


        // Retrieve the host and port from the HTTP context or environment

        InitializeHostAndPort();

    }


    private void InitializeHostAndPort()

    {

        // Retrieve host and port from the current HTTP context, if available

        if (_httpContextAccessor.HttpContext != null)

        {

            var request = _httpContextAccessor.HttpContext.Request;

            _hostName = request.Host.Host;

            _port = request.Host.Port ?? 80; // Default to 80 if port is not specified

        }

        else

        {

            // Use fallback, such as environment variables or predefined config

            _hostName = Environment.GetEnvironmentVariable("HOSTNAME") ?? "localhost";

            _port = int.TryParse(Environment.GetEnvironmentVariable("PORT"), out int port) ? port : 5000;

        }

    }


    public void Start()

    {

        _timer = new Timer(CheckLeadership, null, TimeSpan.Zero, TimeSpan.FromSeconds(5));

    }


    private void CheckLeadership(object state)

    {

        using (var transaction = _dbContext.Database.BeginTransaction())

        {

            var existingLeader = _dbContext.LeaderElections

                .OrderByDescending(le => le.LastHeartbeat)

                .FirstOrDefault(le => le.IsLeader);


            if (existingLeader != null && (DateTime.UtcNow - existingLeader.LastHeartbeat).TotalSeconds < 10)

            {

                // Existing leader is still alive

                if (existingLeader.HostName == _hostName && existingLeader.Port == _port)

                {

                    // Update own heartbeat

                    existingLeader.LastHeartbeat = DateTime.UtcNow;

                    _dbContext.SaveChanges();

                }

            }

            else

            {

                // No leader or leader has failed, try to become leader

                var self = _dbContext.LeaderElections

                    .FirstOrDefault(le => le.HostName == _hostName && le.Port == _port);


                if (self == null)

                {

                    self = new LeaderElection

                    {

                        HostName = _hostName,

                        Port = _port,

                        IsLeader = true,

                        LastHeartbeat = DateTime.UtcNow

                    };

                    _dbContext.LeaderElections.Add(self);

                }

                else

                {

                    self.IsLeader = true;

                    self.LastHeartbeat = DateTime.UtcNow;

                }


                _dbContext.SaveChanges();

            }


            transaction.Commit();

        }

    }

}


Sunday, August 18, 2024

Distributed Task Runner

 Distributed Task Runner


Manager

startup.cs

using ManagerNode;

using Microsoft.AspNetCore.Builder;

using Microsoft.Extensions.DependencyInjection;

using Microsoft.Extensions.Hosting;

using Quartz;

using Quartz.Impl;

using Quartz.Spi;


public class Startup

{

    public void ConfigureServices(IServiceCollection services)

    {

        services.AddControllers();

        services.AddSingleton<JobSchedulerService>();

        services.AddSingleton<RabbitMqJobPublisher>();


        // Register Quartz services

        services.AddSingleton<IJobFactory, SingletonJobFactory>();

        services.AddSingleton<ISchedulerFactory, StdSchedulerFactory>();


        // Register the job itself

        services.AddSingleton<DistributedQuartzJob>();


        // Register the Quartz scheduler with dependencies

        services.AddHostedService<QuartzHostedService>();

    }


    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)

    {

        if (env.IsDevelopment())

        {

            app.UseDeveloperExceptionPage();

        }


        app.UseRouting();

        app.UseEndpoints(endpoints =>

        {

            endpoints.MapControllers();

        });

    }

}


public class SingletonJobFactory : IJobFactory

{

    private readonly IServiceProvider _serviceProvider;


    public SingletonJobFactory(IServiceProvider serviceProvider)

    {

        _serviceProvider = serviceProvider;

    }


    public IJob NewJob(TriggerFiredBundle bundle, IScheduler scheduler)

    {

        return _serviceProvider.GetService(bundle.JobDetail.JobType) as IJob;

    }


    public void ReturnJob(IJob job) { }

}


public class QuartzHostedService : IHostedService

{

    private readonly ISchedulerFactory _schedulerFactory;

    private readonly IJobFactory _jobFactory;

    private readonly JobSchedulerService _jobSchedulerService;

    private IScheduler _scheduler;


    public QuartzHostedService(ISchedulerFactory schedulerFactory, IJobFactory jobFactory, JobSchedulerService jobSchedulerService)

    {

        _schedulerFactory = schedulerFactory;

        _jobFactory = jobFactory;

        _jobSchedulerService = jobSchedulerService;

    }


    public async Task StartAsync(CancellationToken cancellationToken)

    {

        _scheduler = await _schedulerFactory.GetScheduler(cancellationToken);

        _scheduler.JobFactory = _jobFactory;


        // If you need to start scheduling jobs on startup, do it here

        await _scheduler.Start(cancellationToken);

    }


    public async Task StopAsync(CancellationToken cancellationToken)

    {

        if (_scheduler != null)

        {

            await _scheduler.Shutdown(cancellationToken);

        }

    }

}


program.cs


public class Program

{

    public static void Main(string[] args)

    {

        CreateHostBuilder(args).Build().Run();

    }


    public static IHostBuilder CreateHostBuilder(string[] args) =>

        Host.CreateDefaultBuilder(args)

            .ConfigureWebHostDefaults(webBuilder =>

            {

                webBuilder.UseStartup<Startup>();

            });

}


JobSchedulerService.cs

using ManagerNode;

using Quartz;

using Quartz.Impl;

using RabbitMQ.Client;

using System.Text;


public class JobSchedulerService

{

    private readonly IScheduler _scheduler;


    public JobSchedulerService(ISchedulerFactory schedulerFactory)

    {

        _scheduler = schedulerFactory.GetScheduler().Result;

    }


    public async Task ScheduleJob(string jobName, string jobGroup, string triggerName, string triggerGroup, DateTimeOffset startTime)

    {

        var job = JobBuilder.Create<DistributedQuartzJob>()

            .WithIdentity(jobName, jobGroup)

            .Build();


        var trigger = TriggerBuilder.Create()

            .WithIdentity(triggerName, triggerGroup)

            .StartAt(startTime)

            .Build();


        await _scheduler.ScheduleJob(job, trigger);

    }

}


public class DistributedQuartzJob : IJob

{

    private readonly RabbitMqJobPublisher _rabbitMqJobPublisher;


    public DistributedQuartzJob(RabbitMqJobPublisher rabbitMqJobPublisher)

    {

        _rabbitMqJobPublisher = rabbitMqJobPublisher;

    }


    public Task Execute(IJobExecutionContext context)

    {

        // Job execution logic

        var message = "DistributedJob executed at " + DateTime.Now;

        Console.WriteLine(message);


        // Add job details to RabbitMQ queue

        _rabbitMqJobPublisher.PublishJob(message);


        return Task.CompletedTask;

    }

}

RabbitMqJobPublisher.cs

using Newtonsoft.Json;

using RabbitMQ.Client;

using System.Text;

using DistributedScheduler.Jobs;


namespace ManagerNode

{

    public class TypedObject

    {

        public required string Type { get; set; }

        public required object Data { get; set; }

    }


    public class RabbitMqJobPublisher

    {

        private readonly IConnection _connection;

        private readonly IModel _channel;


        public RabbitMqJobPublisher()

        {

            var factory = new ConnectionFactory() { HostName = "localhost" };

            _connection = factory.CreateConnection();

            _channel = _connection.CreateModel();


            _channel.QueueDeclare(queue: "job_queue",

                                 durable: true,

                                 exclusive: false,

                                 autoDelete: false,

                                 arguments: null);

        }


        public void PublishJob(IDistributedJob job)

        {

            var message = JsonConvert.SerializeObject(job, Formatting.Indented);

            var body = Encoding.UTF8.GetBytes(message);


            _channel.BasicPublish(exchange: "",

                                 routingKey: "job_queue",

                                 basicProperties: null,

                                 body: body);

        }


        public void PublishJob(string jobString)

        {

            var message = jobString;

            var body = Encoding.UTF8.GetBytes(message);


            _channel.BasicPublish(exchange: "",

                                 routingKey: "job_queue",

                                 basicProperties: null,

                                 body: body);

        }

    }

}


JobController.cs


using DistributedScheduler.Jobs;
using Microsoft.AspNetCore.Mvc;

namespace ManagerNode.Controllers
{
    [ApiController]
    [Route("api/manager/job")]
    public class JobController : ControllerBase
    {
        private readonly JobSchedulerService _jobSchedulerService;

        public JobController(JobSchedulerService jobSchedulerService)
        {
            _jobSchedulerService = jobSchedulerService;
        }

        [HttpPost]
        public async Task<IActionResult> SubmitJob([FromBody] DistributedJob job)
        {

            if (_jobSchedulerService is null)
                throw new ArgumentNullException(nameof(_jobSchedulerService));

            await _jobSchedulerService.ScheduleJob(job.Name, job.Name, job.Name, job.Name, new DateTimeOffset(DateTime.Now.AddMinutes(1)));

            return Ok(new { job.Id, Status = "Job submitted and published to queue successfully" });
        }
    }

}

ManagerController.cs

using Microsoft.AspNetCore.Mvc;

[ApiController]
[Route("api/manager")]
public class ManagerController : ControllerBase
{
    [HttpGet("health")]
    public IActionResult Health()
    {
        var healthStatus = new
        {
            Status = "Healthy",
            PendingTasks = 5, // Example value
            ConnectedRunners = 10 // Example value
        };
        return Ok(healthStatus);
    }

    [HttpGet("status")]
    public IActionResult Status()
    {
        // Return detailed status, possibly with more complex data
        return Ok("Manager is operational with all systems functioning.");
    }
}

Runner

startup.cs

using Microsoft.AspNetCore.Builder;

using Microsoft.Extensions.DependencyInjection;

using Microsoft.Extensions.Hosting;


public class Startup

{

    public void ConfigureServices(IServiceCollection services)

    {

        services.AddControllers();

        services.AddSingleton<JobConsumerService>(); // Register JobConsumerService

    }


    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)

    {

        if (env.IsDevelopment())

        {

            app.UseDeveloperExceptionPage();

        }


        app.UseRouting();

        app.UseEndpoints(endpoints =>

        {

            endpoints.MapControllers();

        });


        // Ensure JobConsumerService is instantiated when the application starts

        var jobConsumerService = app.ApplicationServices.GetService<JobConsumerService>();

    }

}


program.cs

using Microsoft.AspNetCore.Hosting;

using Microsoft.Extensions.Hosting;


public class Program

{

    public static void Main(string[] args)

    {

        CreateHostBuilder(args).Build().Run();

    }


    public static IHostBuilder CreateHostBuilder(string[] args) =>

        Host.CreateDefaultBuilder(args)

            .ConfigureWebHostDefaults(webBuilder =>

            {

                webBuilder.UseStartup<Startup>();

            });

}

RunnerController.cs

using Microsoft.AspNetCore.Mvc;


namespace RunnerNode.Controllers

{

    [ApiController]

    [Route("api/runner")]

    public class RunnerController : ControllerBase

    {

        [HttpGet("health")]

        public IActionResult GetHealth()

        {

            // Implement health check logic

            return Ok("Runner Node is healthy");

        }


        [HttpGet("status")]

        public IActionResult GetStatus()

        {

            // Implement status retrieval logic

            return Ok("Status: Processing jobs");

        }

    }


}

JobConsumeService.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

public class JobConsumerService
{
    private readonly IConnection _connection;
    private readonly IModel _channel;

    public JobConsumerService()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();

        _channel.QueueDeclare(queue: "job_queue",
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            ProcessJob(message);
        };

        _channel.BasicConsume(queue: "job_queue",
                             autoAck: true, // Auto-acknowledge messages
                             consumer: consumer);
    }

    private void ProcessJob(string jobData)
    {
        try
        {
            Console.WriteLine($"Processing job: {jobData}");
            // Implement your job processing logic here
            Console.WriteLine($"Job processed successfully: {jobData}. Completed on {DateTime.Now}");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error processing job: {ex.Message}");
            // Handle exception (e.g., retry logic)
        }
    }

}


Distributed Jobs Class Lib

DistributedJob.cs

public class DistributedJob : IDistributedJob
{
    public required string Id { get; set; }
    public required string Name { get; set; }
    public required string Description { get; set; }
    public required string TypeName { get; set; }

    public required string Status { get; set; }
    public required string CronSchedule { get; set; }
    public DateTime? CreatedAt { get; set; }
}

public interface IDistributedJob
{
    string Id { get; set; }
    string Name { get; set; }
    string Description { get; set; }
    string TypeName { get; set; }
    string Status { get; set; }
    string CronSchedule { get; set; }
    DateTime? CreatedAt { get; set; }
}


Run Rabbit MQ with docker. It is easier than installing in local machine

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

Postman Collection

{
"info": {
"_postman_id": "108dded1-e54e-4999-974a-4bb8353b8da1",
"name": "Distributed Scheduler",
"schema": "https://schema.getpostman.com/json/collection/v2.0.0/collection.json",
"_exporter_id": "15322780"
},
"item": [
{
"name": "Manager",
"item": [
{
"name": "Job",
"item": [
{
"name": "New Request",
"request": {
"method": "POST",
"header": [],
"body": {
"mode": "raw",
"raw": "{\r\n    \"Id\": \"acfd5c51-fc47-4f26-8fb1-0c5253ef9e65\",\r\n    \"Name\": \"Job 1\",\r\n    \"Description\": \"Job One\",\r\n    \"status\": \"pending\",\r\n    \"TypeName\": \"DistributedJob, DistributedScheduler.Jobs\",\r\n    \"CronSchedule\": \"* * * * \"\r\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"url": "http://localhost:5082/api/manager/job"
},
"response": []
}
]
},
{
"name": "Health",
"request": {
"method": "GET",
"header": [],
"url": "http://localhost:5082/api/manager/health"
},
"response": []
},
{
"name": "Status",
"request": {
"method": "GET",
"header": [],
"url": "http://localhost:5082/api/manager/health"
},
"response": []
}
]
},
{
"name": "Runner",
"item": [
{
"name": "Health",
"request": {
"method": "GET",
"header": [],
"url": "http://localhost:5028/api/runner/health"
},
"response": []
},
{
"name": "Status",
"request": {
"method": "GET",
"header": [],
"url": "http://localhost:5028/api/runner/health"
},
"response": []
}
]
}
]
}