Giter VIP home page Giter VIP logo

etl.net's Introduction

Etl.Net

ETL.NET Go to full documentation

🚨 I need some help!!! 🚑

I need some serious help for the following subject:

  • 📚 Documentation. The website of ETL.NET needs to be completed. It contains the essential to start and get in touch, but a dramatic amount of serious features is not documented in there.
  • ✔️ Unit tests. A proper test code coverage is the only real way to be taken seriously when it is about selecting the right open source library. There was a time when some units tests existed only for the reactive engine of the core (Paillave.EtlNet.Core). But it happened I had to decommission it in the even of the V2 release.

📩 Anybody who is keen to participate to the project in these regards is very welcome to contact me at [email protected].

Presentation

Implementation of a mass processing engine to use in a similar way than Linq with every SSIS features and much more. The reactive approach for the implementation of this engine ensures parallelized multi streams, high performances and low memory foot print even with million rows to process.

ETL.NET is fully written in .NET for a multi platform usage and for a straight forward integration in any application.

Extend it takes 5mn... literally.

Package nuget version nuget downloads
Paillave.EtlNet.Core NuGet NuGet
Paillave.EtlNet.Autofac NuGet NuGet
Paillave.EtlNet.Dropbox NuGet NuGet
Paillave.EtlNet.EntityFrameworkCore NuGet NuGet
Paillave.EtlNet.ExcelFile NuGet NuGet
Paillave.EtlNet.ExecutionToolkit NuGet NuGet
Paillave.EtlNet.FileSystem NuGet NuGet
Paillave.EtlNet.FromConfigurationConnectors NuGet NuGet
Paillave.EtlNet.Ftp NuGet NuGet
Paillave.EtlNet.Mail NuGet NuGet
Paillave.EtlNet.GraphApi NuGet NuGet
Paillave.EtlNet.Sftp NuGet NuGet
Paillave.EtlNet.SqlServer NuGet NuGet
Paillave.EtlNet.TextFile NuGet NuGet
Paillave.EtlNet.Bloomberg NuGet NuGet
Paillave.EtlNet.XmlFile NuGet NuGet
Paillave.EtlNet.Zip NuGet NuGet
Paillave.EtlNet.Pdf NuGet NuGet
Paillave.EntityFrameworkCoreExtension NuGet NuGet

Examples

Unzip it, read it, save it, report it

Read all zip files from a folder, unzip csv files that are inside, parse them, exclude duplicates, upsert them into database, and report new or pre existing id corresponding to the email.

dotnet new console -o SimpleTutorial
cd SimpleTutorial
dotnet add package Paillave.EtlNet.Core
dotnet add package Paillave.EtlNet.FileSystem
dotnet add package Paillave.EtlNet.Zip
dotnet add package Paillave.EtlNet.TextFile
dotnet add package Paillave.EtlNet.SqlServer
using System;
using System.Threading.Tasks;
using Paillave.Etl.Core;
using Paillave.Etl.FileSystem;
using Paillave.Etl.Zip;
using Paillave.Etl.TextFile;
using Paillave.Etl.SqlServer;
using System.Data.SqlClient;
using System.Linq;

namespace SimpleTutorial
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
            processRunner.DebugNodeStream += (sender, e) => { /* place a conditional breakpoint here for debug */ };
            using (var cnx = new SqlConnection(args[1]))
            {
                cnx.Open();
                var executionOptions = new ExecutionOptions<string>
                {
                    Resolver = new SimpleDependencyResolver().Register(cnx),
                };
                var res = await processRunner.ExecuteAsync(args[0], executionOptions);
                Console.Write(res.Failed ? "Failed" : "Succeeded");
                if (res.Failed)
                    Console.Write($"{res.ErrorTraceEvent.NodeName}({res.ErrorTraceEvent.NodeTypeName}):{res.ErrorTraceEvent.Content.Message}");
            }
        }
        private static void DefineProcess(ISingleStream<string> contextStream)
        {
            contextStream
                .CrossApplyFolderFiles("list all required files", "*.zip", true)
                .CrossApplyZipFiles("extract files from zip", "*.csv")
                .CrossApplyTextFile("parse file", FlatFileDefinition.Create(i => new Person
                {
                    Email = i.ToColumn("email"),
                    FirstName = i.ToColumn("first name"),
                    LastName = i.ToColumn("last name"),
                    DateOfBirth = i.ToDateColumn("date of birth", "yyyy-MM-dd"),
                    Reputation = i.ToNumberColumn<int?>("reputation", ".")
                }).IsColumnSeparated(','))
                .Distinct("exclude duplicates based on the Email", i => i.Email)
                .SqlServerSave("upsert using Email as key and ignore the Id", o => o
                    .ToTable("dbo.Person")
                    .SeekOn(p => p.Email)
                    .DoNotSave(p => p.Id))
                .Select("define row to report", i => new { i.Email, i.Id })
                .ToTextFileValue("write summary to file", "report.csv", FlatFileDefinition.Create(i => new
                {
                    Email = i.ToColumn("Email"),
                    Id = i.ToNumberColumn<int>("new or existing Id", ".")
                }).IsColumnSeparated(','))
                .WriteToFile("save log file", i => i.Name);
        }
        private class Person
        {
            public int Id { get; set; }
            public string Email { get; set; }
            public string FirstName { get; set; }
            public string LastName { get; set; }
            public DateTime DateOfBirth { get; set; }
            public int? Reputation { get; set; }
        }
    }
}

Run it, debug it, track it, log it

Execute an ETL process, debug it by tracking debug events using the IDE debugger, catch execution events and log it into database.

using System;
using System.Threading.Tasks;
using Paillave.Etl.Core;
using Paillave.Etl.FileSystem;
using Paillave.Etl.Zip;
using Paillave.Etl.TextFile;
using Paillave.Etl.SqlServer;
using System.Data.SqlClient;

namespace SimpleTutorial
{
  class Program
  {
    static async Task Main(string[] args)
    {
      var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
      processRunner.DebugNodeStream += (sender, e) => { /* PLACE A CONDITIONAL BREAKPOINT HERE FOR DEBUG */ };
      using (var cnx = new SqlConnection(args[1]))
      {
        cnx.Open();
        var executionOptions = new ExecutionOptions<string>
        {
          Resolver = new SimpleDependencyResolver().Register(cnx),
          TraceProcessDefinition = DefineTraceProcess,
          // UseDetailedTraces = true // activate only if per row traces are meant to be caught
        };
        var res = await processRunner.ExecuteAsync(args[0], executionOptions);
        Console.Write(res.Failed ? "Failed" : "Succeeded");
        if (res.Failed)
          Console.Write($"{res.ErrorTraceEvent.NodeName}({res.ErrorTraceEvent.NodeTypeName}):{res.ErrorTraceEvent.Content.Message}");
      }
    }
    private static void DefineProcess(ISingleStream<string> contextStream)
    {
      // TODO: define your ELT process here
    }
    private static void DefineTraceProcess(IStream<TraceEvent> traceStream, ISingleStream<string> contentStream)
    {
      traceStream
        .Where("keep only summary of node and errors", i => i.Content is CounterSummaryStreamTraceContent || i.Content is UnhandledExceptionStreamTraceContent)
        .Select("create log entry", i => new ExecutionLog
          {
            DateTime = i.DateTime,
            ExecutionId = i.ExecutionId,
            EventType = i.Content switch
            {
              CounterSummaryStreamTraceContent => "EndOfNode",
              UnhandledExceptionStreamTraceContent => "Error",
              _ => "Unknown"
            },
            Message = i.Content switch
            {
              CounterSummaryStreamTraceContent counterSummary => $"{i.NodeName}: {counterSummary.Counter}",
              UnhandledExceptionStreamTraceContent unhandledException => $"{i.NodeName}({i.NodeTypeName}): [{unhandledException.Level.ToString()}] {unhandledException.Message}",
              _ => "Unknown"
            }
          })
        .SqlServerSave("save traces", o => o.ToTable("dbo.ExecutionTrace"));
    }
    private class ExecutionLog
    {
      public DateTime DateTime { get; set; }
      public Guid ExecutionId { get; set; }
      public string EventType { get; set; }
      public string Message { get; set; }
    }
  }
}

Normalize it

Dispatch rows from a flat file into several tables to normalize data thanks to the correlation mechanism.

private static void DefineProcess(ISingleStream<string> contextStream)
{
  var rowStream = contextStream
    .CrossApplyFolderFiles("list all required files", "*.csv", true)
    .CrossApplyTextFile("parse file", FlatFileDefinition.Create(i => new
    {
      Author = i.ToColumn("author"),
      Email = i.ToColumn("email"),
      TimeSpan = i.ToDateColumn("timestamp", "yyyyMMddHHmmss"),
      Category = i.ToColumn("category"),
      Link = i.ToColumn("link"),
      Post = i.ToColumn("post"),
      Title = i.ToColumn("title"),
    }).IsColumnSeparated(','))
    .SetForCorrelation("set correlation for row");

  var authorStream = rowStream
    .Distinct("remove author duplicates based on emails", i => i.Email)
    .Select("create author instance", i => new Author { Email = i.Email, Name = i.Author })
    .EfCoreSaveCorrelated("save or update authors", o => o
      .SeekOn(i => i.Email)
      .AlternativelySeekOn(i => i.Name));

  var categoryStream = rowStream
    .Distinct("remove category duplicates", i => i.Category)
    .Select("create category instance", i => new Category { Code = i.Category, Name = i.Category })
    .EfCoreSaveCorrelated("insert categories if doesn't exist, get it otherwise", o => o
      .SeekOn(i => i.Code)
      .DoNotUpdateIfExists());

  var postStream = rowStream
    .CorrelateToSingle("get related category", categoryStream, (l, r) => new { Row = l, Category = r })
    .CorrelateToSingle("get related author", authorStream, (l, r) => new { l.Row, l.Category, Author = r })
    .Select("create post instance", i => string.IsNullOrWhiteSpace(i.Row.Post)
      ? new LinkPost
      {
        AuthorId = i.Author.Id,
        CategoryId = i.Category.Id,
        DateTime = i.Row.TimeSpan,
        Title = i.Row.Title,
        Url = new Uri(i.Row.Link)
      } as Post
      : new TextPost
      {
        AuthorId = i.Author.Id,
        CategoryId = i.Category.Id,
        DateTime = i.Row.TimeSpan,
        Title = i.Row.Title,
        Text = i.Row.Post
      })
    .EfCoreSaveCorrelated("save or update posts", o => o
      .SeekOn(i => new { i.AuthorId, i.DateTime }));
}

etl.net's People

Contributors

cyberop5 avatar dependabot[bot] avatar francisgauthier1 avatar hugovg avatar inspironix avatar jduchateau avatar paillave avatar pm7y avatar viktorasmickunas avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

etl.net's Issues

How to get the header/col datatype during parsing a CSV file

in my case, the source CSV data file and header content/types change often -- basically unknown, while parsing or before parsing I need to first Discover the Data Types of the data in the cols.

I just need to load & save the data into a Sql table based on whatever data & DataTypes there are in the CSV file. Can I auto load the CSV data and datatypes into the DB automatically with ETL box. I saw your expando example but was a little lost how to achieve a high level task

For e.g high level algorithm, how would I achieve this in your lib.

// Scan first 9 rows to discover the datatypes in the CSV file cols
var etlPrasedFile = ETL.Paillave.Open/parseCSV(someUnknownData.csv).ScanRows(9)
foreach (var col in etlPrasedFile )
new dataTable.addColumn(etlParsedFile.GetNextHeaderCol)
CreateNewSqlServerTable
LoadDatatoSqlTable??

thanks

Running a stored procedure only once after all records in a CSV file have been saved to the database

Let's say we have a data pipeline like this:

        var stream = contextStream
            .ToSqlCommand("Create tables", Queries.CreateTables)
            .CrossApplyFolderFiles("Get data file", "data.csv", true)
            .CrossApplyTextFile("Parse data", FlatFileDefinition.Create(
                i => new
                {
                    code = i.ToColumn(0),
                    name = i.ToColumn(1)
                }).IsColumnSeparated(','))
            .SqlServerSave("Populate data table", o => o
                .SeekOn(i => code)
                .ToTable("[dbo].[some_table]")
            )
	   .ToSqlCommand("Run a stored procedure", Queries.RunStoredProcedure);

A CSV is parsed, its records are saved to a table on a SQL Server database and then I want to run a stored procedure which depends on that data.

If I try to run it and the CSV file contains 5 rows, stored procedure is being run 5 times, and I only need to run it once.

I tried to create a second data pipeline to run the stored procedure like this:

        var stream = contextStream
            .ToSqlCommand("Create tables", Queries.CreateTables)
            .CrossApplyFolderFiles("Get data file", "data.csv", true)
            .CrossApplyTextFile("Parse data", FlatFileDefinition.Create(
                i => new
                {
                    code = i.ToColumn(0),
                    name = i.ToColumn(1)
                }).IsColumnSeparated(','))
            .SqlServerSave("Populate data table", o => o
                .SeekOn(i => code)
                .ToTable("[dbo].[some_table]")
            );

       contextStream
	   .ToSqlCommand("Run a stored procedure", Queries.RunStoredProcedure);         

but when I try to execute it, stored procedure is not being run after whole CSV import is complete and rows are stored in the table, it's being run somewhere in between and the stored procedure can't get the data it needs since it has not been saved yet.

How can I signal to the pipeline, that I need to run the stored procedure just once?
I tried to create a separate DefineProcess method for running SP, but that seems really clumsy.

What would you advice?
Thank you.

How to read XML file

Discussed in #388

Originally posted by LordBigDuck November 10, 2022
Hello, I have gone through the documentation and source code but didn't manage to write running code to read XML file. Could you provide some samples ?

EfCoreSelect

Hi,

is there a way how to use the select using batches without loading all data at once from a server into memory?

Warehousing - copy data from one database to another

Hi, can you read data from a source db and copy it to another, target db? I've read and tried the examples in the documentation, but read and save always happen inside the same database even though the documentation mentions 2 args parameters.

Please provide example code if it's possible.

[Question] Working with FileStream?

Hi. The library looks great.

Is it possible to provide FileStream by myself (from ASP.Net core Action) and process it?
And is it possible to parse JSON files?
By the way, are there any pitfalls in using this library directly in the ASP.Net core web app with EF core?

SFTP connections not working without trailing slashes in folder paths

When trying to use the SFTP provider to get files, I ran into a possible bug - the SftpFileValue uses Path.Combine in the GetContent method to pass a filepath to the SFTP server, however if there's no trailing path delimiter ie, Path.Combine("SomeRoot/content", "filename.txt") then Path.Combine assumes the default for the machine it is running on. This can lead to a Linux SFTP server receiving a filepath with mixed forward and back slashes, and Renci.SshNet spits out a 'No such file' exception. That sent me down a lot of wrong turns since I assumed there was something wrong with the SFTP server or the authentication since the file clearly existed.

There is an easy workaround - make sure that there are trailing slashes in the path. It feels like Path.Combine is redundant in the SftpFileValue then because it's just concatenating the strings together at that point, and the error message as it stands is not helpful in trying to track down the issue (especially since the filepath isn't included in the exception, and the bits of path on their own seem sensible when passing them in - indeed, they work to get a list of the files at that location!)

Reusing DB context issue with large collections

Hi all,

I've been using the EfCoreSave operator with SaveMode.EntityFrameworkCore option enabled as I'm planning to write my data to Postgres.

I've noticed when dealing with large streams, this operator creates a bottleneck for two reasons:

  1. A single TPT is used to write the data
  2. The DbContext is never disposed of, therefore creating a large quantity of unused entities in the change tracker (albeit in with "detached" state).

I've changed a local copy to look as follows:

var ret = args.SourceStream.Observable
    .Chunk(args.BatchSize)
    .Map(i => i.Select(j => (Input: j, Entity: args.GetEntity(j))).ToList())
    .Do(i =>
    {
        var dbContextFactory = args.KeyedConnection == null
            ? this.ExecutionContext.DependencyResolver.Resolve<IDbContextFactory<TMyContext>>()
            : this.ExecutionContext.DependencyResolver.Resolve<IDbContextFactory<TMyContext>>(args.KeyedConnection);
        using (var dbContext = dbContextFactory.CreateDbContext())
        {
            ProcessBatch(i, dbContext, args.BulkLoadMode);
        }
})

MS Docs on DbContextFactory

This allows multiple streams to write to the database at the same time, and crucially, disposes the DbContext after each operation (considerably improving performance/memory overhead in the change tracker).

On my sample of 500k+ entries the import is now silky smooth.

Was there a deliberate design decision behind this?

Question: calling stored procedure which returns values

Reviewed documentation on calling stored procedures and SQL commands, and I would like to know:

  1. is it possible to retrieve and pass further stored procedure output into the pipeline? Based on the documentation: ToSqlCommand always returns the input events as is.;
  2. if script used in ToSqlCommand has variable declarations like:
DECLARE @typeVar Char(1) = 'A';
SELECT * FROM dbo.myTable WHERE type = @typeVar;

How we could use @ within, isn't it reserved for parameter injection from upper stream? Checked the code, and its a bit strange to use @ for params, as @ is used in MS SQL for variable definitions and @@ for system variables as well.

Code Sample for reading excel files

Discussed in #375

Originally posted by franks1 August 9, 2022
I have gone through the sample and tutorial of your extraordinary Library (Etl.Net). I would like you to assist me with a sample code on reading Excel Files. Thank you

Substract on non ordered streams produces incorrect result

Hi again :-)

It seems that an issue is hidding in the Substract operator on non ordered streams.

Considering the following example:

var stream1 = contextStream
    .CrossApply("create values from enumeration", ctx => Enumerable.Range(1, 100)
        .Select(i => new { Id = i, Label = $"Label{i}" }));
var stream2 = contextStream
    .CrossApply("create values from enumeration2", ctx => Enumerable.Range(1, 8)
        .Select(i => new { Id = i, Label = $"OtherLabel{i}" }));
var res = stream1.Substract("merge with stream 2", stream2, i => i.Id, i => i.Id)
   .Do("print console", i => Console.WriteLine(i.Label));

The previous code produces only 1 row (Label100) instead of 92 rows.

No issue when using ordered streams and the method
public static IStream<TInLeft> Substract<TInLeft, TInRight, TKey>(this ISortedStream<TInLeft, TKey> leftStream, string name, ISortedStream<TInRight, TKey> rightStream).

(version 2.0.23)

How to compare two streams?

Discussed in #446

Originally posted by jignesh-dalal June 9, 2023
Input Stream 1:
ID,Name
1,Danny
2,Fred
3,Sam

Input Stream 2:
ID,name
1,Danny
3,Pamela
4,Fernando

Output:
ID,name,Status
1,Danny,Unchanged
2,Fred,Deleted
3,Pamela,Changed
4,Fernando,New

Can this be achieved using Etl.Net?

.SqlServerSave() support on MS SQL Server versions prior to 2005

.SqlServerSave() currently generates sql query for INSERTs/UPDATEs using OUTPUT INSERTED.* clause which is not supported by SQL Server versions prior to 2005.

I know that this is legacy stuff, but ETL.NET would be very useful for migrating necessary scenarios from ancient versions like SQL Server 2000.

Could the query be adjusted? It would suffice to execution an additional SELECT query like SELECT * FROM <table> WHERE col1=@val1, col2=@val2, <...> coln=@valn to achieve the behaviour of OUTPUT INSERTED.*. I guess positional arguments would need to be used as well here to support connections from ODBC/OLE DB drivers.

This could probably be implemented by creating a new node, e.g. SqlServer2000SaveStreamNode.cs and adding a new extension method, say .ToSqlServer2000().

SQL transaction

Hi,

Is there a way how to make transaction processing in SQL ?
Is there a way how to skip failed rows which were failed and continue with inserting next rows ?

Can you execute the `ExecuteNonQueryAsync` in a transaction when it is provided by the database?

Can you execute the ExecuteNonQueryAsync in a transaction when it is provided by the database?

Reason

I try to execute the Delete(EFCore) operator within a transaction scope, but it throws an error, due to this command.
Setting its Transaction property based on the Database should fix this.

var dbCommand = dbCtx.CreateCommand();

Originally posted by @TomatorCZ in #292 (comment)

Raising an error and short-circuiting the execution of remaining operators

In the initial stage of our data processing pipeline, we verify the presence of a predefined set of files. If any of these files are missing, we need to raise an error. Is there a recommended practice for triggering errors, such as invoking a specific method, or is raising an exception the sole approach for handling this situation? Thank you

Getting two database streams from the same database connection in parallel

Hello, nice package you have here for ETL in .NET!
I have a question regarding database streams:

Suppose I need to define two streams for two different tables from the same ODBC database connection:

        var arch1 = contextStream
            .CrossApplySqlServerQuery("first query from source 1", o => o
                    .FromQuery("select * from dbo.carr")
                    .WithMapping(i => new
                    {
                        carr_code = i.ToColumn("carr_code"),
                        carr_name = i.ToColumn("carr_name")
                    })
                , "source1")
            .Select("create row to save source1 from first query", i => new { i.carr_name, i.carr_code });
        
        var arch2 = contextStream
            .CrossApplySqlServerQuery("second query from source 1", o => o
                    .FromQuery("select * from dbo.carr_old")
                    .WithMapping(i => new
                    {
                        carr_code_old = i.ToColumn("carr_code"),
                        carr_name_old = i.ToColumn("carr_name")
                    })
                , "source1")
            .Select("create row to save source1 from second query", i => new { i.carr_name_old, i.carr_code_old });

When executing this I get this error from the ODBC driver:

Unhandled exception. Paillave.Etl.Core.JobExecutionException: Job execution failed
 ---> System.Data.Odbc.OdbcException (0x80131937): ERROR [HY000] [Microsoft][ODBC SQL Server Driver]Connection is busy with results for another hstmt

This implies that the cursor for the connection has to be looped through to completion before being able to get another stream. Is there any way that this could be remedied? The source database is an old SQL Server 2000, so we're forced to use ODBC.
Thank you.

BTW, it works if I output one stream to a file and then define a new stream sequentially like this:

        var arch = contextStream
            .CrossApplySqlServerQuery("query from source 1", o => o
                    .FromQuery("select * from dbo.carr")
                    .WithMapping(i => new
                    {
                        carr_code = i.ToColumn("carr_code"),
                        carr_name = i.ToColumn("carr_name")
                    })
                , "source1")
            .Select("create row to save source1", i => new { i.carr_name, i.carr_code })
            .ToTextFileValue("to file for source1", @"C:\temp\carr_out_arch.csv",
                FlatFileDefinition.Create(f => new { carr_name = f.ToColumn("Name"), carr_code = f.ToColumn("Code") })
                    .IsColumnSeparated('|'))
            .WriteToFile("save to source1 output file", i => i.Name)
            .CrossApplySqlServerQuery("query from source 2", o => o
                    .FromQuery("select * from dbo.carr_old")
                    .WithMapping(i => new
                    {
                        carr_code_old = i.ToColumn("carr_code"),
                        carr_name_old = i.ToColumn("carr_name")
                    })
                , "source1")
            .Select("create row to save source2", i => new { i.carr_name_old, i.carr_code_old })
            .ToTextFileValue("to file for source2", @"C:\temp\carr_out_arch2.csv",
                FlatFileDefinition.Create(f => new { carr_name_old = f.ToColumn("Name"), carr_code_old = f.ToColumn("Code") })
                    .IsColumnSeparated('|'))
            .WriteToFile("save to source2 output file", i => i.Name);

Specify timeout for .CrossApplySqlServerQuery and .ToSqlCommand

Is there a way to specify the SQL command execution timeout for .CrossApplySqlServerQuery and .ToSqlCommand?
I'm trying to run a stored proc using .CrossApplySqlServerQuery and consume the result set that it returns and it fails me with a timeout.

As I found out by googling, default CommandTimeout for SqlConnection and OdbcConnection is 30 seconds which is not enough.

Is there a way to go around this? How do you solve this in your production environments, because I suspect not all queries/stored proc calls can be completed in under 30 seconds.

SimpleConsoleExecutionDisplay - documentation not inline with sources, any clues how to make it work ? :-)

Hi,

I'm trying to use the SimpleConsoleExecutionDisplay.

The documentation use a parameterless constructor but the class has actually no default ctor.
Passing default leads to NullReferenceException. Passing new() for each params seems ok, but I don't know what is expected.

Moreover the doc specify to use the Initialize() method with a structure variable (of type JobDefinitionStructure), but I've no idea where to get that "structure" ?
I've passed the StreamProcessRunner.GetDefinitionStructure(), is it correct ?

Thank you by advance :)

Documentation to Deal with dependency injection for Unit tests

Discussed in #385

Originally posted by mlholt03 October 17, 2022
If I am access a database using this tool, and I wish to write unit tests for the code I am writing, how can I mock out the database access such that my unit test is not actually attempting to connect to a database? Is there some way to provide a SqlCommandValueProvider instance that simply returns a basic sample set of data without querying a database?

[FtpFileValueProvider] NotImplementedException after execution

Hi,

I'm using the library as shown in the samples:

contextStream.CrossApply("Get Files", new FtpFileValueProvider("SRC", "Solar exports", "files from ftp", connectionParams, providerParams))
        .Do("print files to console", i => Console.WriteLine(i.Name));

File names are printed correctly, but I'm always getting an NotImplementedException at the end of the execution.

It seems to be linked to the FileValueProviderBase.TypeName that explicitly throws a NotImplementedException.
The following change attempt has fixed the issue in my local copy:

namespace Paillave.Etl.Core
{
    public abstract class FileValueProviderBase<TConnectionParameters, TProviderParameters> : IFileValueProvider, IValuesProvider<object, IFileValue>
    {
   [...]

        public string TypeName => this.GetType().Name; // throw new NotImplementedException()

Am I using the ftp provider a wrong way or should it be fixed?

Thank you by advance,
Nicolas

StreamProcessRunner is holding file in memory when I need it not to do that

My user uploads a .csv file and I parse it with Etl.Net. Inside the Do section, I am using a validation package with specialized rule set for the data. However when the edited version of the same file is uploaded the results seem to be from the previous file version. I think that the file is being kept in memory. Is there a way with Etl.Net to dispose the file after the Do section? Thank you!

return StreamProcessRunner.CreateAndExecuteAsync(file,
t => t
	.Select("Use uploaded file", i => FileValue.Create(i.OpenReadStream(), i.FileName, "Posted to API"))
	.CrossApplyTextFile("parse file", FlatFileDefinition.Create(c => new AddMerchantUploadFileModel()
	{
		.... // Cut out for privacy \n\r
	}).IsColumnSeparated(','))
	.Do("validate", f => ValidateMerchantFile(f, addedMerchants, errors))
);```

SqlServerSave fails because column names are not enclosed in square brackets

Hi, I am attempting to use SqlServerSave however it fails for me because I believe internally it is not enclosing each property/column name in square brackets. I have a column called "Key" which is also a special sql server keyword which must be enclosed in brackets (i.e. [Key]) otherwise the generated sql statement fails.

Handling multiline text column

How can we handle if the pipe sepeated csv is having a text column with multiline text. Text delimitter is ".

With the given example in the document, it is throwing error "could not deserialize value" because of the new line in the text column

Mysql support

Discussed in #357

Originally posted by lvjoemcintyre July 20, 2022
Are there plans for mysql support?

I started trying to modifiy the sqlserver project to support mysql.
I'm bumping into some trouble understanding how to debug it.

[Injection] error "key value cannot be null" when injecting derived dbcontext

Hi again :-)

Sorry bothering you so often these last few days, but I've found another glitch, along with its workaround.
I'm only trying to help 😳

I use EF core to save records in a Postgresql DB .WithMode(SaveMode.EntityFrameworkCore));.
My own DataContext derives from Microsoft.EntityFrameworkCore.DbContext.

If I try to inject my datacontext without explicitly casting to DbContext, I get an ArgumentNullException with the following message : key Value cannot be null. (Parameter 'key')

The issue is thrown by Paillave.Etl.EntityFrameworkCore.EfCoreSaveStreamNode<TInEf, TIn, TOut>.CreateOutputStream:

protected override IStream<TOut> CreateOutputStream(EfCoreSaveArgs<TInEf, TIn, TOut> args)
{
    var ret = args.SourceStream.Observable
        .Chunk(args.BatchSize)
        .Map(i => i.Select(j => (Input: j, Entity: args.GetEntity(j))).ToList())
        .Do(i =>
        {
            /*** Thrown by the following lines => ***/
            var dbContext = args.KeyedConnection == null
                ? this.ExecutionContext.DependencyResolver.Resolve<DbContext>()
                : this.ExecutionContext.DependencyResolver.Resolve<DbContext>(args.KeyedConnection);
            this.ExecutionContext.InvokeInDedicatedThreadAsync(dbContext, () => ProcessBatch(i, dbContext, args.BulkLoadMode)).Wait();
        })
        .FlatMap((i, ct) => PushObservable.FromEnumerable(i, ct))
        .Map(i => args.GetOutput(i.Input, i.Entity));
    return base.CreateUnsortedStream(ret);
}

Downcasting explicitly to DbContext when injecting fix this issue:

public class DataContext : DbContext   
{
   [...]
}

[...]

using var dc = new DataContext();

var options = new ExecutionOptions<string>
{
   // This works
    Resolver = new SimpleDependencyResolver().Register<DbContext>(dc),
   // This doesn't work
    Resolver = new SimpleDependencyResolver().Register(dc)    
};
var res = await processRunner.ExecuteAsync("", options);

Question: connecting to old SQL server

Is there a way to use other db connector except EFCore and SQLClient to access databases? As we have issue using any of the implemented due to the issue, that database we are targeting is SQL 2000.

EF Core Spatial Integration

Do you currently allow for spatial joins / lookups as part of the ETL process pipeline? And if not, are you considering support for it?

Buffer operator

Discussed in #433

Originally posted by ctrlaltdan May 16, 2023
Hi, I was wondering if there is any support for a Buffer operator (specifically mirroring the Buffer(timeSpan, count) Rx operator.

Use case is to parse and load chunked output onto a service bus queue.

.ToTextFileValue fails when output is bigger than 2GB

.ToTextFileValue extension method fails when result output is bigger than 2GBs. We worked around with .Chunk() and then .Do(), but due to this lost elegancy of using file definition.

Is there a more elegant way to resolve this? As for ETL processes it is important to have possibility extract big data files.

[Question] - Plans to define a DAG to capture the ETL process

Preferably I'm looking for a tool similar to Dagster, Prefect, or Airflow, which are popular tools in the python ecosystem that allow users to express data pipelines in the form of DAGs and have them be recurring, containerizable etc. Are there plans to expose that sort of functionality in this project? Thanks.

Getting in-memory list from the database stream

Hello,
is there a way to get an in memory list as an instantiated IList<T> from SQL Server stream and have the database cursor consumed/closed?
First I thought extension method ToList() is for this purpose, but it appears it returns an ISingleStream<List<T>>

        var inMemoryCarrierList = contextStream
            .CrossApplySqlServerQuery("carrier stream", builder => builder
                .FromQuery("select carr_code, carr_name from dbo.carr_old order by carr_code")
                .WithMapping(m => new
                {
                    carr_code = m.ToColumn("carr_code"),
                    carr_name = m.ToColumn("carr_name")
                }), "source1")
            .ToList("in memory carrier list");

If I get a list this way and I don't consume this stream by, say, outputting to a text file and then I try a query like this on the same ODBC connection:

        var arch1 = contextStream
            .CrossApplySqlServerQuery("first query from source 1", o => o
                    .FromQuery("select * from dbo.carr")
                    .WithMapping(i => new
                    {
                        carr_code = i.ToColumn("carr_code"),
                        carr_name = i.ToColumn("carr_name")
                    })
                , "source1")
            .Select("create row to save source1 from first query", i => new { i.carr_name, i.carr_code });

I get error:

Unhandled exception. Paillave.Etl.Core.JobExecutionException: Job execution failed
 ---> System.Data.Odbc.OdbcException (0x80131937): ERROR [HY000] [Microsoft][ODBC SQL Server Driver]Connection is busy with results for another hstmt

Question: error path handling

ETLs like SSIS supports error handling by calling/executing routines to handle failures:
image

Also there is Event handler for all errors, like in pic below:
image

Is there ways to do similar routines with Etl.Net?

need to enable MultipleActiveResultSets in connectionString when i have 2 sql upsert in the same DefineProcess

hello, im working on a synchronisation between 2 database.
in the DefineProcess i read data from EF and insert it with SqlServerSave method.

if i have only one SqlServerSave in the DefineProcess, it works fine.
Howeever, if i try to update more than one table with multiple SqlServerSave (from different streams)

it throw exception like
There is already an open DataReader associated with this Command which must be closed first
Timeout expired. The timeout period elapsed prior to completion of the operation or the server is not responding.

after some research on the exception, i end up trying to set MultipleActiveResultSets=true on the connection string that i'm doing the upserts on.

With the MARS attribute, now it works fine with multiple SqlServerSave.

Is there a way to achive the same thing without setting this attribute ?
Or is it supposed to works only with the MARS attribute on ?

thank you

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.